# ProcessScheduler¶

A a 100% pure python library to compute resource-constrained task schedules. It is distributed under the terms of the GNU General Public License v3 or later.

## Introduction¶

ProcessScheduler is intended to be used in an industrial context (manufacturing, building industry, healthcare, etc.), and can be used for any related scheduling problem. It targets complex problems for which an obvious solution can not be found.

The following features are provided:

Task definition with zero, fixed or variable length, work_amount,

Resource definition including productivity and cost, assignment of resource(s) to tasks,

Temporal tasks constraints (precedence, fixed start, fixed end etc.),

Resource constraints: resource availability,

First order logic operation between tasks/resources constraints: and/or/xor/not boolean operators, implication, if/then/else,

Multi objective optimization,

Gantt chart rendering using matplotlib or plotly,

Export solution to json, SMT problem to SMTLIB.

This document explains how to write the model, run the solver, and finally analyze the solution(s).

### What’s inside¶

ProcessScheduler processes a model written using the Python programming language. It produces a schedule compliant with a set of constraints over tasks and/or resources.

The scheduling problem is solved using the Microsoft SMT Z3 Prover, a MIT licensed SMT solver. The optimization part of the solver is described in this paper: Bjorner et al. νZ - An Optimizing SMT Solver (2016). A good introduction to programming Z3 with Python can be read at z3-py-tutorial. Z3 is the only mandatory dependency of ProcessScheduler.

The solution of a scheduling problem can be rendered to a Gantt chart using the matplotlib or plotly libraries, and exported to any of the common jpg, png, pdf or svg formats. matplotlib and plotly are not installed by default, they are optional dependencies.

### Download/install¶

Use `pip`

to install the package and the required dependencies (Z3) on your machine:

```
pip install ProcessScheduler
```

and check the installation from a python3 prompt:

```
>>> import processscheduler as ps
```

### Development version¶

Create a local copy of the github repository:

```
git clone https://github.com/tpaviot/ProcessScheduler
```

Then install the development version:

```
cd ProcessScheduler
pip install -e .
```

## SchedulingProblem¶

The `SchedulingProblem`

class is the container for all modeling objects, such as tasks, resources and constraints.

### Time slots as integers¶

A `SchedulingProblem`

instance holds a *time* interval: the lower bound of this interval (the *initial time*) is always 0, the upper bound (the *final time*) can be set by passing the `horizon`

attribute to the
`__init__()`

method:

```
my_problem = SchedulingProblem('MySchedulingProblem', horizon=20)
```

The time interval is divided into a finite number of *periods*. Each period has a duration of 1. If \(horizon\) is the horizon, then the number of periods is \(horizon\) as well, and the number of points in the interval \([0;horizon]\) is \(horizon+1\).

Warning

ProcessScheduler handles variables represented by **integer** values.

A period is the finest granularity level that describes the time line, the task durations, and the schedule itself. The time line is dimensionless. It is up to you to map one period to the desired duration, in seconds/minutes/hours. For example:

you need to schedule a set of tasks in a single day, let’s say from 8 am to 6pm (office hours). The time interval is then 10 hours length. If you plan to schedule tasks with a granularity of 1 hour, then the horizon value will be 10 in order to get the desired number of periods:

you need to schedule a set of tasks in the morning, from 8 am to 12. The time interval is 4 hours. If you plan to schedule tasks with a granularity of 1 minute, then the horizon must be 240:

Note

The `horizon`

attribute is optional. If it is not passed to the `__init__()`

method, the solver will search an horizon value compliant with the set of constraints. In the case where the scheduling problem aims at optimizing the horizon (e.g. a makespan objective), the horizon should not be set manually.

### Mapping integers to datetime objects¶

Because a Gantt chart if much more readable if real dates are represented instead of integers, it is possible to explicitly set the values in second, minutes, hours etc. The integer `1`

, i.e. the smallest time duration for a task, can be mapped to a `timedelta`

python object. Any instant can be mapped to a `datetime`

python object.

Python `timedelta`

objects are created with python:

```
from datetime import timedelta
delta = timedelta(days=50,
seconds=27,
microseconds=10,
milliseconds=29000,
minutes=5,
hours=8,
weeks=2)
```

For `datetime`

objects:

```
from datetime import datetime
now = datetime.now()
```

These attribute values can be passed to the SchedulingProblem initialization method:

```
problem = ps.SchedulingProblem('DateTimeBase',
horizon=7,
delta_time=timedelta(minutes=15),
start_time=datetime.now())
```

After the solver has completed the solution, the end times, start times and durations are exported either to the Gantt chart or any other output type.

Note

Users should refer to the datetime python package documentation.

## Task¶

### Representation of the concept of ‘task’¶

According to the APICS dictionary, a task may either be:

In project management, the lowest level to which work can be divided on a project

In activity-based cost accounting, a task, a subdivision of an activity, is the least amount of work. Tasks are used to describe activities.

In the context of this software library, the concept of task reflects the first point. The purpose of ProcessScheduler is to compute a sequence (a temporal ordering) of a collection of tasks that satisfy a set of constraints.

#### Start/end/duration¶

The `Task`

class and its derivatives represent any task. A `Task`

instance is defined by the three following parameters:

`start`

: a point in the \([0, horizon]\) integer interval. If the task is scheduled, then \(start>=0\)`end`

: a point in the \([0, horizon]\) integer interval. If the task is scheduled, then \(end>=start\)`duration`

: a integer number of periods, such as \(duration=end-start\)

```
# The duration of this task will depend on the number of workers that hold boxes
move_boxes = VariableDurationTask('MoveBoxesFromMachineAToInventory')
```

A `VariableDurationTask`

duration can be bounded by lower and upper values (a number of periods), by setting the `min_duration`

and/or `max_duration`

. In the following example, the `wash_room`

Task instance means: “Washing an hospital room must be completed in at most 20mn, knowing that there’s a total amount of work of 10.” The task duration will depend on how many workers the solver assign to this task. The more workers are assigned, the less the duration will be.

```
wash_room = VariableDurationTask('WashHospitalRoom', work_amount=10, max_duration=20)
```

A `VariableDurationTask`

duration can be selected among a list of possible durations. The solver decides the duration.

```
# either 1 or 2 hour for an english lesson
english_lesson = VariableDurationTask('EnglishLesson', allowed_durations = [1, 2])
```

#### Work amount¶

The `work_amount`

is the total amount of work that the `Task`

must provide. It is set to `0`

by default. The `work_amount`

is a dimensionless positive integer value, it can be mapped to any unit according to the physical meaning of the work amount. For example, if the task target is to move small pieces of wood from one point to another, then the work_amount maybe 166000 if 166000 pieces of woods are to be moved. In a maintenance task, if there are 8 screws to unscrew, the UnScrew work_amount will be set to 8.

#### Priority¶

The `priority`

of a task is a positive integer that can take any value. It is not bounded. A task with a higher priority will be scheduled earlier than a task with a lower priority. If the solver is requested to optimize the global schedule in terms of task priorities (a “priority objective”) then a task with a high priority *may* be scheduled before a task with a lower priority.

#### Optional¶

The `optional`

attribute of a task is a boolean. It is set to `False`

by default. If set to `True`

the solver may, or may not, schedule the task.

Three `Task`

derivative classes can be used to represent a task:

### ZeroDurationTask class¶

A `ZeroDurationTask`

is a task such that \(duration=0\), that is to say \(start=end\). Useful to represent project milestones, or other important points in time.

```
project_kickup = ZeroDurationTask('KickUp')
```

Warning

Each `Task`

instance must have a unique name in the scheduling problem. To prevent that two tasks share the same name, ProcessScheduler raises an exception if ever a task with an existing name is already created.

### FixedDurationTask class¶

The duration of a `FixedDurationTask`

is known *a priori*. You must pass the `duration`

parameter value when creating the instance:

```
# I assume one period to be mapped to 15min, cooking will be 1.5 hour
# so the chicken requires 6*15mn=1.5h to be cooked
cook_chicken = FixedDurationTask('CookChicken', duration=6)
```

### VariableDurationTask class¶

A `VariableDurationTask`

represents a task for which the duration is not known. The the solver is expected to find a duration that satisfies the constraints (the duration may depend on the number of resources assigned to the task). You can bound the duration by using `max_duration`

and/or `min_duration`

parameters.

```
# 48h max to get done
plant_wheat_seeds = VariableDurationTask('PlantWheatSeeds', max_duration=48)
```

### Optional tasks¶

All tasks instances are mandatory by default: the solver has to find a solution where all tasks are actually scheduled. However, tasks instances can be turned into optional tasks, by setting the `optional`

flag to `True`

:

```
# 10mn to clean the table. This is an optional task
clean_the_table_after_meal = FixedDurationTasks('CleanTable', duration=10, optional=True)
```

An optional task may or may not be scheduled by the solver. It depends on the constraints that bound the scheduling problem.

## Task Constraints¶

ProcessScheduler provides a set of ready-to-use temporal task constraints. They allow expressing common rules such as “the task A must start exactly at the instant 4”, “the task B must end at the same time than the task C ends”, “the task C must be scheduled exactly 3 periods after the task D is completed” etc.

Note

Naming convention: if the class name starts with *Task** then the constraint applies to one single task, if the class name starts with *Tasks** it applies to 2 or more task instances.

### Time constraints¶

`TaskPrecedence`

: takes two parameters`task_1`

and`task_2`

and constraints`task_2`

to be scheduled after`task_1`

is completed. The precedence type can either be`'lax'`

(default,`task_2.start`

>=`task_1.end`

)),`'strict'`

(`task_2.start`

>=`task_1.end`

)) or`'tight'`

(`task_2.start`

>=`task_1.end`

, task_2 starts immediately after task_1 is completed). An optional parameter`offset`

can be additionally set.

```
task_1 = ps.FixedDurationTask('Task1', duration=3)
task_2 = ps.FixedVariableTask('Task2')
pc = TaskPrecedence(task1, task2, kind='tight', offset=2)
```

constraints the solver to schedule task_2 start exactly 2 periods after task_1 is completed.

`TasksStartSynced`

: takes two parameters`task_1`

and`task_2`

such as the schedule must satisfy the constraint \(task_1.start = task_2.start\)

`TasksEndSynced`

: takes two parameters`task_1`

and`task_2`

such as the schedule must satisfy the constraint \(task_1.end = task_2.end\)

`TasksDontOverlap`

: takes two parameters`task_1`

and`task_2`

such as the task_1 ends before the task_2 is started or the opposite (task_2 ends before task_1 is started)

`TaskStartAt`

: takes two parameters`task`

and`value`

such as the task starts exactly at the instant \(task.start = value\)`TaskStartAfterStrict`

: the constraint \(task.start > value\)`TaskStartAfterLax`

: the constraint \(task.start >= value\)`TaskEndAt`

: takes two parameters`task`

and`value`

such as the task ends exactly at the instant*value*\(task.end = value\)`TaskEndBeforeStrict`

: the constraint \(task.end < value\)`TaskEndBeforeLax`

: the constraint \(task.end <= value\)`TasksContiguous`

: take a liste of tasks, force the schedule so that tasks are contiguous`ScheduleNTasksInTimeIntervals`

: given a set of \(m\) different tasks, and a list of time intervals, schedule \(N\) tasks among \(m\) in this time interval.`ResourceTasksDistance`

: take a mandatory attribute`distance`

(integer), an optional`time_periods`

(list of couples of integers e.g. [[0, 1], [5, 19]]). All tasks, that use the given resource, scheduled within the`time_periods`

must have a maximal distance of`distance`

(distance being considered as the time between two consecutive tasks).

Note

If the task(s) is (are) optional(s), all these constraints apply only if the task is scheduled. If the solver does not schedule the task, these constraints does not apply.

### Optional tasks constraints¶

Following constraints apply to optional tasks only.

`OptionalTaskConditionSchedule`

creates a constraint that adds a condition for the task to be scheduled. The condition is a z3 BoolRefthe

`OptionalTasksDependency`

takes two optional tasks`task_1`

and`task_2`

, and ensures that task_1 is schdeuld implies that task_2 is scheduled as well.the

`ForceScheduleNOptionalTasks`

forces \(m\) optional tasks among \(n\) to be scheduled, with \(m \leq n\).

Note

All the Task constraints may be defined as **optional**. This parameter is set to `False`

by default, which means the task constraint is mandatory. If you set the attribute `optional`

to `True`

the the constraint becomes optional, and may/may not apply according to the solver. You can force the schedule to schedule an optional constraint:

```
pb.add_constraint([task.applied == True])
```

## Resource¶

According to the APICS dictionary, a resource is anything that adds value to a product or service in its creation, production, or delivery.

In the context of ProcessScheduler, a resource is anything that is needed by a task to be successfully processed. In a scheduling problem, resources can be human beings, machines, inventories, rooms or beds in an hotel or an hospital, elevator etc.

ProcessScheduler provides the following classes to deal with resources.

### Worker¶

A worker is an atomic countable resource. *Atomic* means it cannot be divided into smaller parts. *Countable* means it is discrete and available in a finite number, in a finite time. The `Worker`

class can be used to represent machines or humans. A Worker has the ability to process a task, alone or together with other workers/resources.

A Worker is created as follows:

```
john = Worker('JohnBenis')
```

### Productivity¶

The worker `productivity`

is the quantity of work the worker can produce per period. The default productivity for a worker is `0`

.

For example, if two drillers are available, the first one with a producvity of 3 holes per period, the second one with a productivity of 9 holes per period, then will be defined as:

```
driller_1 = Worker('Driller1', productivity=3)
driller_2 = Worker('Driller1', productivity=9)
```

Note

The workers `productivity`

is used by the solver to satisfy the targeted task `work_amount`

parameter value.

### Cost¶

A cost information can be added to any resource. ProcessScheduler can use this information to compute the total cost of a schedule, the cost for a resource, or optimize the schedule so that the cost is the lowest (minimiation, see the Objective section). There are currently two different ways to define a resource cost:

the class

`ConstantCostPerPeriod`

: the cost of the resource is constant over time.

```
dev_1 = Worker('SeniorDeveloper', cost=ConstantCostPerPeriod(750))
```

the class

`PolynomialCostFunction`

: the cost of the resource evolves as a polynomial function of time. It is useful to represent, for example, energy cost that is known to be unstable (oil) or time dependent (electricity). The`cost`

parameter takes any python function (i.e. a`callable`

object).

```
def quadratic_time_function(t):
return (t-20)**2 + 154
cost_function = PolynomialCostFunction(quadratic_time_function)
dev_1 = Worker('AWorker', cost=cost_function)
```

The worker `cost`

is set to `None`

by default.

The cost function can be plotted using matplotlib, just for information. Just give the plotter the range to be plotted:

```
cost_function.plot([0, 200])
```

Warning

Currently, ProcessScheduler can handle integer numbers only. Then, all the coefficients of the polynomial must be integer numbers. If ever there are floating point numbers, no exception will be raised, but you might face strange results in the cost computation.

Note

The worker `cost_per_period`

is useful to measure the total cost of a resource/a set of resources/a schedule, or to find the schedule that minimizes the total cost of a resource/a set of resources/ a schedule.

### CumulativeWorker¶

A cumulative worker can process several tasks in parallel. The maximal number of simultaneous tasks the worker can process is defined by the `size`

parameter.

```
# the machine A can process up to 4 tasks at the same time
machine_A = CumulativeWorker('MachineA', size=4)
```

## Resource assignment¶

The solver decides which resource(s) should be assigned for a task to be processed. A `Worker`

instance can process only one task per time period whereas a `CumulativeWorker`

can process different tasks at the same time.

### Single resource assignment¶

Tell the task that it requires a set of resources to be processed

```
assemble_engine = FixedDurationTask('AssembleCarEngine', duration=10)
john = Worker('JohnBenis')
# the AssembleCarEngine can be processed by JohnBenis ONLY
assemble_engine.add_required_resource(john)
```

### Multiple resources assignment¶

To constraint different resources to process one single task.

```
paint_car = FixedDurationTask('PaintCar', duration=13)
john = Worker('JohnBenis')
alice = Worker('AliceParker')
# the PaintCar task requires JohnBenis AND AliceParker
paint_engine.add_required_resources([john, alice])
```

### Alternative resource assignment¶

The `SelectWorkers`

class let the solver decide which resource(s) to assign to a task, among a collection of workers that have the ability to process the task. The solver can decide to assign exactly \(n\) resources, **at most** \(n\) or **at least** \(n\). For example, if 3 drillers are available, and if a drilling task can be processed by any of one of these 3 drillers, it is defined as:

```
drilling_hole = FixedDurationTask('DrillHolePhi10mm', duration=10)
driller_1 = Worker('Driller1')
driller_2 = Worker('Driller2')
driller_3 = Worker('Driller3')
# the DrillHolePhi10mm task can be processed by the Driller1 OR the Driller2 OR the Driller 3
drilling_hole.add_required_resource(SelectWorkers([driller_1, driller_2, driller_3],
nb_workers_to_select=1,
kind='exact'))
```

This tells the solver to assign *exactly 1* resource among the list of the three workers able to process the task. The `kind`

parameter can take either `'exact'`

(default value), `'min'`

or `'max'`

values.

`nb_workers`

can take any integer between 1 (default value) and the number of capable workers from the list.

## Resource Constraints¶

ProcessScheduler provides a set of ready-to-use resource constraints. They allow expressing common rules such as “the resource A is available only from 8 am to 12” etc. There are a set of builtin ready-to-use constraints, listed below.

### WorkLoad¶

The `WorkLoad`

constraint can be used to restrict the number of tasks which are executed during certain time periods.

This constraint applies to one resource, whether it is a single worker or a cumulative worker. It takes the time periods as a python dictionary composed of time intervals (the keys) and an integer number (the capacity). The `kind`

parameter allows to define which kind of restriction applies to the resource: `'exact'`

, `'max'`

(default value) or `'min'`

.

```
c1 = ps.WorkLoad(worker_1, {(0, 6): 2})
```

In the previous example, the resource `worker_1`

cannot be scheduled into more than 2 timeslots between instants 0 and 6.

Any number of time intervals can be passed to this class, just extend the timeslots dictionary, e.g.:

```
c1 = ps.WorkLoad(worker_1, {(0, 6): 2,
(19, 21): 6})
```

The `WorkLoad`

is not necessarily a *limitation*. Indeed you can specify that the integer number is actually an exact of minimal value to target. For example, if we need the resource `worker_1`

to be scheduled **at least** into three time slots between instants 0 and 10, then:

```
c1 = ps.WorkLoad(worker_1, {(0, 10): 3}, kind='min')
```

### DistinctWorkers¶

A `AllDifferentWorkers`

constraint applies to two `SelectWorkers`

instances, used to assign alternative resources to a task. It constraints the solver to select different workers for each `SelectWorkers`

. For instance:

```
s1 = ps.SelectWorkers([worker_1, worker_2])
s2 = ps.SelectWorkers([worker_1, worker_2])
```

could lead the solver to select worker_1 in both cases. Adding the following line:

```
cs = ps.DistinctWorkers(s1, s2)
```

let the solver selects the worker_1 for s1 and worker_2 for s2 or the opposite, worker_2 for s1 and worker_1 for s2. The cases where worker_1 is selected by both s1 and s2 or worker_2 by selected by both s1 and s2 are impossible.

### SameWorkers¶

A `AllSameWorkers`

constraint applies to two `SelectWorkers`

instances. It constraints the solver to ensure both different `SelectWorkers`

instances select the same worker. For example:

```
s1 = ps.SelectWorkers([worker_1, worker_2])
s2 = ps.SelectWorkers([worker_1, worker_2])
```

could lead the solver to select worker_1 for s1 and worker_2 for s2. Adding the following line:

```
cs = ps.SameWorkers(s1, s2)
```

ensures either worker_1 is selected by both s1 and s2, or worker_2 is selected by both s1 and s2.

## Buffer¶

A `Buffer`

is an object where tasks can load or unload a finite number of items. A `Buffer`

can be used to represent a tank, or a temporary buffer of a workshop where manufactured parts are temporarily stored.

A `NonConcurrentBuffer`

is a specific buffers where tasks cannot load and/or unload at the same time. In other words, only one task can access the buffer at a given time.
A `NonConcurrentBuffer``

has three main attributes:

the

`initial_level`

, i.e. the number of items in the buffer for time t=0,the

`lower_bound`

, an optional parameter that sets the minimum number of items in the buffer during the schedule. If ever the solver cannot find a solution where the buffer level is always greater than the`lower_bound`

, it will report an*unsatisfiable*problem,the

`upper_bound`

, an optional parameter that sets the maximum number of items in the buffer during the schedule (in other words, the buffer capacity). If ever the solver cannot find a solution where the buffer level is always lower than the`upper_bound`

, it will report an*unsatisfiable*problem.

Both `initial_level`

, `lower_bound`

and `upper_bound`

are optional parameters. A `NonConcurrentBuffer`

can be created as follows:

```
buff1 = ps.NonConcurrentBuffer("Buffer1")
buff2 = ps.NonConcurrentBuffer("Buffer2", initial_state=10)
buff3 = ps.NonConcurrentBuffer("Buffer3", lower_bound=0)
buff4 = ps.NonConcurrentBuffer("Buffer4", upper_bound=20)
buff5 = ps.NonConcurrentBuffer("Buffer5",
initial_state=3,
lower_bound=0,
upper_bound=10)
```

### Buffer constraints¶

Buffers are loaded/unloaded by tasks. As a consequence, special tasks constraints are used to connect tasks to buffers: `TaskUnloadBuffer`

and `TaskLoadBuffer`

. Both classes take the task instance, the target buffer, and a `quantity`

. Load/Unload constraints can be created as follows:

```
c1 = ps.TaskUnloadBuffer(task_1, buffer, quantity=3)
c2 = ps.TaskUnloadBuffer(task_2, buffer, quantity=6)
# etc.
```

Note

There is no limitation on the number of buffers and/or buffer constraints.

### Example¶

Let’s take an example where a task `T1`

uses a machine `M1`

to manufacture a part (duration time for this task is 4). It takes one part in a `Buffer1`

and loads the `Buffer2`

.

```
machine_1 = ps.Worker('M1')
task_1 = ps.FixedDurationTask('T1', duration=4)
task_1.add_required_resource(machine_1)
# the create buffers
buffer_1 = ps.NonConcurrentBuffer("Buffer1", initial_state=5)
buffer_2 = ps.NonConcurrentBuffer("Buffer2", initial_state=0)
# buffer constraints
c1 = ps.TaskUnloadBuffer(task_1, buffer_1, quantity=1)
c2 = ps.TaskLoadBuffer(task_1, buffer_2, quantity=1)
```

The graphical output shows the Gantt chart and the evolution of the buffer states along the time line.

## First order logic constraints¶

Builtin constraints may not be sufficient to cover the large number of use-cases user may encounter. Rather than extending more and more the builtin constraints, ProcessScheduler lets you build your own constraints using logical operators, implications and if-then-else statement between builtin constraints or class attributes.

### Logical operators¶

Logical operators and (\(\wedge\)), or (\(\lor\)), xor (\(\oplus\)), not (\(\lnot\)) are provided through the functions `and_()`

, `or_()`

, `xor_()`

and `not_()`

.

Note

Take care of the trailing underscore character at the end of the function names. They are necessary because `and`

, `or`

, and `not`

are python keywords that cannot be overloaded. This naming convention may conflict with functions from the `operator`

standard module.

Using builtin task constraints in combination with logical operators enables a rich expressivity. Imagine that you need a task \(t_1\) to NOT start at time 3. At a first glance, you can expect a `TaskDontStartAt`

to fit your needs, but it is not available from the builtin constraints library. The solution is to express this constraint in terms of first order logic, and state that you need the rule:

In python, this gives:

```
not_(TaskStartAt(t_1, 3)
```

You can combine/nest any of these operators to express a complex constraint. For example, if you don’t want the task to start at 3, and also you don’t want it to end at 9, then the rule to implement is:

In python:

```
and_([not_(TaskStartAt(t_1, 3)),
not_(TaskEndAt(t_1, 9))])
```

In a more general cas, those logical functions can take both task constraints or tasks attributes. For example, the following assertion is possible :

```
problem.add_constraint(t1.start == t_2.end + t_4.duration)
```

### Logical Implication - Conditional expressions¶

The logical implication (\(\implies\)) is wrapped by the `implies()`

function. It takes two parameters: a condition, that always has to be `True`

or `False`

, and a list of assertions that are to be implied if the condition is `True`

. For example, the following logical implication:

is written in Python:

```
consequence = implies(t_2.start == 4,
[TasksEndSynced(t_3, t_4)]
problem.add_constraint(consequence)
```

Finally, an if/then/else statement is available through the function `if_then_else()`

which takes 3 parameters: a condition and two lists of assertions that applies whether the condition is `True`

or `False`

.

```
ite = if_then_else(t_2.start == 4, # condition
[TasksEndSynced(t_3, t_4)], # if the condition is True
[TasksStartSynced(t_3, t_4)]) # if the condition is False
problem.add_constraint(ite)
```

Note

The `implies()`

and `if_then_else()`

functions names do not conflict with any other function name from another package, thus dont have any underscore suffix.

## Indicator¶

`Indicator`

class¶

The `Indicator`

class allows to define a criterion that quantifies the schedule so that it can be compared with other schedules. An `Indicator`

instance results in a conclusion such a ‘the schedule A is better than schedule B because the indicator XXX is greater/lower’.

An `Indicator`

instance is created by passing the indicator name as well as the mathematical expression to compute. For example:

```
flow_time = Indicator('FlowTime', task1.end + task2.end)
duration_square = Indicator('MinTaskDuration', task1.duration ** 2 + task2.duration ** 2)
```

Indicator values are computed by the solver, and are part of the solution. If the solution is rendered as a matplotlib Gantt chart, the indicator values are displayed on the upper right corner of the chart.

Indicators can also be bounded, although it is an optional feature. It is useful if the indicator is further be maximized (or minimized) by an optimization solver, in order to reduce the computation time. For example,

```
indicator1 = Indicator('Example1', task2.start - task1.end, bounds = (0,100)) # If lower and upper bounded
indicator2 = Indicator('Example2', task3.start - task2.end, bounds = (None,100)) # If only upper bounded
indicator3 = Indicator('Example3', task4.start - task3.end, bounds = (0,None)) # If only lower bounded
```

Note

There is no limit to the number of Indicators defined in the problem. The mathematical expression must be expressed in a polynomial form and using the `Sqrt()`

function. Any other advanced mathematical functions such as `exp()`

, `sin()`

, etc. is not allowed because not supported by the solver.

### Builtin indicators : Resource cost, utilization, number of tasks assigned¶

Two usual indicators are available : the utilization and cost of a resource (or a list of resources).

Use the `add_indicator_resource_utilization()`

method to insert a cost computation to your schedule. This method takes a list of resources and compute the total cost (sum of productivity * duration for all resources). The result is a percentage: an utilization of 100% means that the resource is assigned 100% of the schedule timeline. In the following example, the indicator reports the utilization percentage of the `worker_1()`

.

```
utilization_res_1 = problem.add_indicator_resource_utilization(worker_1)
```

The `add_indicator_resource_cost()`

method returns the total cost of a resource (or a list of resource). It is computed using each cost function defined for each resource. In the following example, the indicator `cost_ind()`

is the total cost for both ressources `worker_1()`

and `worker_2()`

.

```
cost_ind = problem.add_indicator_resource_cost([worker_1, worker_2])
```

At last, the `add_indicator_number_tasks_assigned()`

method returns the number of tasks assigned to a resource after the schedule is completed.

```
problem.add_indicator_number_tasks_assigned(worker)
```

## Optimization¶

ProcessScheduler is able to compute optimized schedules according to one (single optimization) or any number (multi-objectives) of objectives.

### Objective¶

An objective is a target value for an `Indicator`

or any of the variables defined in the scheduling problem:

if the target value is known, then the objective can either be

`ExactObjective`

,`MinimumObjective`

or`MaximumObjective`

,it the target value is unknown but you want to find a minimal or maximal value, the the objective can be the result from an optimization resolution,

`MaximizeObjective`

or`MinimizeObjective`

.

Warning

Using `MaximizeObjective`

or `MinimizeObjective`

classes turn the problem into an optimization problem. This will result in heavier computations and, thus, a longer time for the problem to be solved.

For example, if you need to optimize the utilization of a resource (bounded up to 100% within the problem horizon),

```
# first create the indicator
utilization_res_1 = problem.add_indicator_resource_utilization(worker_1)
# the tell the solver to optimize this value
ps.MaximizeObjective("MaximizeResource1Utilization", utilization_res_1)
```

### Builtin optimization objectives¶

For any problem, the following builtin objectives are available:

`add_objective_makespan()`

: minimize the schedule horizon,`add_objective_resource_utilization()`

: maximize resource occupation,`add_objective_resource_cost()`

: minimize the total cost for selected resource(s),`add_objective_priorities()`

: minimize total priority indicator (tasks with high priorities will be scheduled before tasks with lower priorities, under the condition however that all constraints are satisfied),`add_objective_start_earliest()`

: minimize the start time of the last task to be scheduled,`add_objective_start_latest()`

: maximize the start time of the first task to be scheduled,`add_objective_flowtime()`

: minimize flowtime.`add_objective_flowtime_single_resource()`

: minimize flowtime of a single resource on a specific time interval

### Available solvers : incremental and optimize¶

The default optimization solver is `incremental()`

. After a solution is found, the solver will run again and again to find a better solution untill the maximum allowed time is reached. If you provide a small max_time value, the solver will exit to the last found value, but there may be a better value. In that case, just increase the max_time and run again the solver.

```
solver = ps.SchedulingSolver(pb, max_time=300) # 300s is 5 minutes
solution = solver.solve()
```

The other available solver is called `optimize()`

, which use the builtin optsmt z3-solver. The computation cannot be interrupted, so be careful if the problem to solve involves many tasks/resources. However, the :func`optimize` is guaranteed to return the optimal value.

```
solver = ps.SchedulingSolver(pb, optimizer="optimize") # 300s is 5 minutes
solution = solver.solve()
```

### Single objective optimization¶

Imagine you need to schedule one specific task `task_1`

the later. After you defined the task as usual, then create the objective and set the optimization target:

```
pb = ps.SchedulingProblem('SingleObjective1', horizon=20)
task_1 = ps.FixedDurationTask('task1', duration = 3)
indicator_1 = ps.Indicator('Task1End', task_1.end)
ps.MaximizeObjective('MaximizeTask1End', indicator_1)
ps.SchedulingSolver(pb).solve()
```

The expected value for the indicator_1 maximization is 20. After running the script, you may get the following output:

```
Solver type:
===========
-> Standard SAT/SMT solver
Incremental optimizer:
======================
Found better value: 3 elapsed time:0.000s
Checking better value >3
Found better value: 4 elapsed time:0.071s
Checking better value >4
[...]
Checking better value >18
Found better value: 19 elapsed time:0.074s
Checking better value >19
Found better value: 20 elapsed time:0.074s
Checking better value >20
No solution can be found for problem MultiObjective2.
Reason: Unsatisfiable problem: no solution exists
Found optimum 20. Stopping iteration.
total number of iterations: 19
value: 20
MultiObjective2 satisfiability checked in 0.07s
```

The solver returns the expected result.

### Multiple objective optimization using the incremental solver (default)¶

ProcessScheduler can deal with multiple objectives optimization. There is no limitation regarding the number of objectives.

Imagine you need to schedule two tasks `task_1`

and `task_2`

the later. After you defined the task as usual, then create the objective and set the optimization target:

```
pb = ps.SchedulingProblem('MultiObjective1', horizon=20)
task_1 = ps.FixedDurationTask('task1', duration = 3)
task_2 = ps.FixedDurationTask('task2', duration = 3)
indicator_1 = ps.Indicator('Task1End', task_1.end)
indicator_2 = ps.Indicator('Task2End', task_2.end)
ps.MaximizeObjective('MaximizeTask1End', indicator_1)
ps.MaximizeObjective('MaximizeTask2End', indicator_2)
solution = ps.SchedulingSolver(pb).solve()
print(solution)
```

After running the script, you may get the following output:

```
[...]
{
"horizon": 20,
"indicators": {
"EquivalentIndicator": -40,
"Task1End": 20,
"Task2End": 20
},
[...]
```

The solver gives the expected result. Note that an EquivalentIndicator is built from both indicators. A maximization problem is always turned into a minimization problem (the equivalent indicator is negative).

### Weighted objectives¶

In the previous example, if we add a constraint between tasks `task_1`

and `task_2`

, then both tasks end may not be independent from each other. For example, let’s add the following constraint:

```
pb.add_constraint(task_1.end == 20 - task_2.start)
```

This looks like a kind of balance: the later `task_1`

is scheduled, the sooner `task_2`

is scheduled. We can leave both optimizations enabled, but the solver has to know what to do with these conflicting objectives, and especially what is there relative **weight**.

Note

MaimizeObjective and MinimizeObjective have an optional `weight`

parameter set by default to `1.0`

. The higher this value, the more important the objective.

The python script will look like

```
import processscheduler as ps
pb = ps.SchedulingProblem('MultiObjective2', horizon=20)
task_1 = ps.FixedDurationTask('task1', duration = 3)
task_2 = ps.FixedDurationTask('task2', duration = 3)
pb.add_constraint(task_1.end == 20 - task_2.start)
indicator_1 = ps.Indicator('Task1End', task_1.end)
indicator_2 = ps.Indicator('Task2End', task_2.end)
ps.MaximizeObjective('MaximizeTask1End', indicator_1, weight=1.)
ps.MaximizeObjective('MaximizeTask2End', indicator_2, weight=1.)
solution = ps.SchedulingSolver(pb).solve()
print(solution)
```

```
"indicators": {
"EquivalentIndicator": -23,
"Task1End": 20,
"Task2End": 3
},
```

The solver decides to schedule the Task1 at the end of the timeline. Let’s change the relative weights so that the second objective is considered as more important:

```
ps.MaximizeObjective('MaximizeTask1End', indicator_1, weight=1.)
# the second one is ten times more important
ps.MaximizeObjective('MaximizeTask2End', indicator_2, weight=10.)
```

This lead the solver to another solution:

```
"indicators": {
"EquivalentIndicator": -203,
"Task1End": 3,
"Task2End": 20
},
```

### Multiple objective optimization using the optimize solver (default)¶

### Lexicon priority (`'lex'`

, default)¶

The solver optimizes the first objective, then the second one while keeping the first value, then the third one keeping both previous values etc.

In the previous example, the first objective to be optimized will be the end of task_1, obviously 20. Then, this value being fixed, there’s no other solution than start of the second task is 0, then task_2 end will be 3.

```
ps.SchedulingSolver(pb, optimizer=optimize, optimize_priority='lex').solve()
```

And the output

```
Optimization results:
=====================
->Objective priority specification: lex
->Objective values:
->Indicator_Task1End(max objective): 20
->Indicator_Task2End(max objective): 3
```

### Lexicon priority (`'box'`

)¶

The optimization solver breaks the dependency between objectives and look for the maximum (resp. minimum) value that can be achieved for each objective.

In the previous example, the maximum of task_1end can be 20, and the maximum of task_2.end can also be 20, but not at the same time. The `box`

priority then gives an information about the values that can be reached.

```
ps.SchedulingSolver(pb, optimizer=optimize, optimize_priority='box').solve()
```

And the output

```
Optimization results:
=====================
->Objective priority specification: lex
->Objective values:
->Indicator_Task1End(max objective): 20
->Indicator_Task2End(max objective): 20
```

Note

In the `box`

mode, both objectives may not be reached simultaneously, the solver will give anyway a solution that satisfies **all** constraints (by default the solution obtained from the lexicon mode).

### Pareto priority (`'pareto'`

)¶

The optimization solver suggests a new solution each time the `solve()`

method is called. This allows traversing all solutions. Indeed we can have the task_1 end to 20 and task_2 end 3, but also the task_1 end to 19 and task_2 end to 4 etc. These all are solutions for the optimization problem.

The python code has to be slightly modified:

```
solver = ps.SchedulingSolver(pb, optimizer=optimize, optimize_priority='pareto')
while solver.solve():
pass
```

And the output will be:

```
Optimization results:
=====================
->Objective priority specification: pareto
->Objective values:
->Indicator_Task1End(max objective): 20
->Indicator_Task2End(max objective): 3
SAT computation time:
=====================
MultiObjective2 satisfiability checked in 0.00s
Optimization results:
=====================
->Objective priority specification: pareto
->Objective values:
->Indicator_Task1End(max objective): 19
->Indicator_Task2End(max objective): 4
SAT computation time:
=====================
MultiObjective2 satisfiability checked in 0.00s
Optimization results:
=====================
->Objective priority specification: pareto
->Objective values:
->Indicator_Task1End(max objective): 18
->Indicator_Task2End(max objective): 5
[...]
```

Here you have 18 different solutions. You can add a test to the loop to stop the iteration as soon as you’re ok with the solution.

## Problem solving¶

Solving a scheduling problem involves the `SchedulingSolver`

class.

### Define the solver¶

A `SchedulingSolver`

instance takes a `SchedulingProblem`

instance:

```
solver = SchedulingSolver(scheduling_problem_instance)
```

It takes the following optional arguments:

`debug`

: False by default, if set to True will output many useful information.`max_time`

: in seconds, the maximal time allowed to find a solution. Default is 60s.`parallel`

: boolean False by default, if True force the solver to be executed in multithreaded mode. It*might*be quicker. It might not.`random_values`

: a boolean, default to`False`

. If set to`True`

, enable a builtin generator to set random initial values. By setting this attribute to`True`

, one expects the solver to give a different solution each time it is called.`logics`

: a string, None by default. Can be set to any of the supported z3 logics, “QF_IDL”, “QF_LIA”, etc. see https://smtlib.cs.uiowa.edu/logics.shtml. By default (logics set to None), the solver tries to find the best logics, but there can be significant improvements by setting a specific logics (“QF_IDL” or “QF_UFIDL” seems to give the best performances).`verbosity`

: an integer, 0 by default. 1 or 2 increases the solver verbosity. TO be used in a debugging or inspection purpose.

### Solve¶

Just call the `solve()`

method. This method returns a `Solution`

instance.

```
solution = solver.solve()
```

Running the `solve()`

method returns can either fail or succeed, according to the 4 following cases:

The problem cannot be solved because some constraints are contradictory. It is called “Unsatisfiable”. The

`solve()`

method returns False. For example:

```
TaskStartAt(cook_the_chicken, 2)
TaskStartAt(cook_the_chicken, 3)
```

It is obvious that these constraints cannot be both satisfied.

The problem cannot be solved for an unknown reason (the satisfiability of the set of constraints cannot be computed). The

`solve()`

method returns False.The solver takes too long to complete and exceeds the allowed

`max_time`

. The`solve()`

method returns False.The solver successes in finding a schedule that satisfies all the constraints. The

`solve()`

method returns the solution as a JSON dictionary.

Note

If the solver fails to give a solution, increase the `max_time`

(case 3) or remove some constraints (cases 1 and 2).

### Find another solution¶

The solver may schedule:

one solution among many, in the case where there is no optimization,

the best possible schedule in case of an optimization issue.

In both cases, you may need to check a different schedule that fits all the constraints. Use the `find_another_solution()`

method and pass the variable you would want the solver to look for another solution.

Note

Before requesting another solution, the `solve()`

method has first to be executed, i.e. there should already be a current solution.

You can pass any variable to the `find_another_solution()`

method: a task start, a task end, a task duration, a resource productivity etc.

For example, there are 5 different ways to schedule a FixedDurationTask with a duration=2 in an horizon of 6. The default solution returned by the solver is:

```
problem = ps.SchedulingProblem('FindAnotherSolution', horizon=6)
solutions =[]
task_1 = ps.FixedDurationTask('task1', duration=2)
problem.add_task(task_1)
solver = ps.SchedulingSolver(problem)
solution = solver.solve()
print("Solution for task_1.start:", task_1.scheduled_start)
```

```
Solution for task_1.start: 0
```

Then, we can request for another solution:

```
solution = solver.find_another_solution(task_1.start)
if solution is not None:
print("New solution for task_1.start:", solution.tasks[task_1.name].start)
```

```
Solution for task_1.start: 1
```

You can recursively call `find_another_solution()`

to find all possible solutions, until the solver fails to return a new one.

### Run in debug mode¶

If the `debug`

attribute is set to True, the z3 solver is run with the unsat_core option. This will result in a much longer computation time, but this will help identifying the constraints that conflict. Because of this higher consumption of resources, the `debug`

flag should be used only if the solver fails to find a solution.

### Render to a Gantt chart¶

Call the `render_gantt_matplotlib()`

to render the solution as a Gantt chart. The time line is from 0 to `horizon`

value, you can choose to render either `Task`

or `Resource`

(default).

```
solution = solver.solve()
if solution is not None:
solution.render_gantt_matplotlib() # default render_mode is 'Resource'
# a second gantt chart, in 'Task' mode
solution.render_gantt_matplotlib(render_mode='Task')
```

Call the `render_gantt_plotly()`

to render the solution as a Gantt chart using **plotly**.
Take care that plotly rendering needs **real timepoints** (set at least `delta_time`

at the problem creation).

```
solution = solver.solve()
if solution is not None:
# default render_mode is 'Resource'
solution.render_gantt_plotly(sort="Start", html_filename="index.html")
# a second gantt chart, in 'Task' mode
solution.render_gantt_plotly(render_mode='Task')
```

## Use case: flowshop scheduling¶

This example is based on the paper from Tao et al. (2015), where authors present an introduction example. In a flow shop problem, a set of \(n\) jobs has to be processed on \(m\) different machines in the same order. Job \(j\), \(j=1,2,...,n\) is processed on machines \(i\), \(i=1,2,..,m\), with a nonnegative processing time \(p(i,j)\) and a release date \(r_j\), which is the earliest time when the job is permitted to process. Each machine can process at most one job and each job can be handled by at most one machine at any given time. The machine processes the jobs in a first come, first served manner. The goal is to determine a job sequence that minimizes the makespan.

The problem statement is:

The following solution is reported by the authors (order J1, J3, J4, J2, scheduled horizon: 29):

In this notebook, we try to **Reference**

Tao Ren, Meiting Guo, Lin Lin, Yunhui Miao, “A Local Search Algorithm for the Flow Shop Scheduling Problem with Release Dates”, Discrete Dynamics in Nature and Society, vol. 2015, Article ID 320140, 8 pages, 2015. https://doi.org/10.1155/2015/320140

### Imports¶

```
[1]:
```

```
import processscheduler as ps
%config InlineBackend.figure_formats = ['svg']
```

### Create the scheduling problem¶

The total horizon is unknwown, leave it empty and only set the problem name.

```
[2]:
```

```
flow_shop_problem = ps.SchedulingProblem('FlowShop')
```

### Create the 3 machines M1, M2 and M3¶

```
[3]:
```

```
M3 = ps.Worker('M3')
M2 = ps.Worker('M2')
M1 = ps.Worker('M1')
```

### Assign resources¶

One machine per task.

```
[5]:
```

```
J11.add_required_resource(M1)
J12.add_required_resource(M2)
J13.add_required_resource(M3)
J21.add_required_resource(M1)
J22.add_required_resource(M2)
J23.add_required_resource(M3)
J31.add_required_resource(M1)
J32.add_required_resource(M2)
J33.add_required_resource(M3)
J41.add_required_resource(M1)
J42.add_required_resource(M2)
J43.add_required_resource(M3)
```

### Constraint: release dates¶

```
[6]:
```

```
r1 = 0
r2 = 9
r3 = 2
r4 = 7
ps.TaskStartAfterLax(J11, r1)
ps.TaskStartAfterLax(J12, r1)
ps.TaskStartAfterLax(J13, r1)
ps.TaskStartAfterLax(J21, r2)
ps.TaskStartAfterLax(J22, r2)
ps.TaskStartAfterLax(J23, r2)
ps.TaskStartAfterLax(J31, r3)
ps.TaskStartAfterLax(J32, r3)
ps.TaskStartAfterLax(J33, r3)
ps.TaskStartAfterLax(J41, r4)
ps.TaskStartAfterLax(J42, r4)
ps.TaskStartAfterLax(J43, r4)
```

```
[6]:
```

```
TaskStartAfterLax_d50b63ba(<class 'processscheduler.task_constraint.TaskStartAfterLax'>)
1 assertion(s):
J43_start >= 7
```

### Constraints: precedence¶

All jobs should be scheduled in the same ordre on each machine. The constraint is expressed as following: all J2 tasks must be scheduled before Or after J2 tasks, AND all J3 tasks must be scheduled before OR oafter J1 tasks etc.

```
[7]:
```

```
J1 = [J11, J12, J13]
J2 = [J21, J22, J23]
J3 = [J31, J32, J33]
J4 = [J41, J42, J43]
# we need to combinations function of the itertools module,
# to compute all pairs from the list of jobs.
from itertools import combinations
for Ja, Jb in combinations([J1, J2, J3, J4], 2):
befores = []
afters = []
for i in range(3):
Ja_before_Jb = ps.TaskPrecedence(Ja[i], Jb[i])
Ja_after_Jb = ps.TaskPrecedence(Jb[i], Ja[i])
befores.append(Ja_before_Jb)
afters.append(Ja_after_Jb)
xor_operation = ps.xor_([ps.and_(befores), ps.and_(afters)])
flow_shop_problem.add_constraint(xor_operation)
```

### Add makespan objective¶

```
[8]:
```

```
makespan_obj = flow_shop_problem.add_objective_makespan()
```

### Solution, plot the schedule¶

```
[9]:
```

```
solver = ps.SchedulingSolver(flow_shop_problem)
solution = solver.solve()
solution.render_gantt_matplotlib(fig_size=(10,5), render_mode='Resource')
```

```
Solver type:
===========
-> Standard SAT/SMT solver
Incremental optimizer:
======================
Found value: 28 elapsed time:0.007s
Checking better value <28
Found value: 22 elapsed time:0.008s
Checking better value <22
Found value: 21 elapsed time:0.008s
Checking better value <21
No solution can be found for problem FlowShop.
Reason: Unsatisfiable problem: no solution exists
Found optimum 21. Stopping iteration.
total number of iterations: 4
value: 21
FlowShop satisfiability checked in 0.01s
```

We confirm the job sort from Tao et al. (2015) (J1 then J3, J4 and finally J2). The horizon is here only 21.

## Use case: formula one pitstop¶

This example is based on the DailyMail blog entry https://www.dailymail.co.uk/sport/formulaone/article-4401632/Formula-One-pit-stop-does-crew-work.html where a nice image shows 21 people changing the 4 tires of a Formula 1 Ferrari. In this example, only 16 out 21 people are represented. This notebook can be tested online at mybinder.org

```
[1]:
```

```
from IPython.display import YouTubeVideo
YouTubeVideo('aHSUp7msCIE', width=800, height=300)
```

```
[1]:
```

### Imports¶

```
[2]:
```

```
import processscheduler as ps
%config InlineBackend.figure_formats = ['svg']
```

### Create the scheduling problem¶

The total horizon is not knwown, leave it empty and only set the problem name.

```
[3]:
```

```
change_tires_problem = ps.SchedulingProblem('ChangeTires')
```

### Create the 16 available resources¶

Each people in and around the car is represented as a worker.

```
[4]:
```

```
nb_lifters = 2
nb_gunners = 4
nb_tyre_handlers = 8
nb_stabilizers = 2
```

```
[5]:
```

```
# Lift tasks
lifters = [ps.Worker('JackOperator%i' % (i + 1)) for i in range(nb_lifters)]
gunners = [ps.Worker('Gunner%i' % (i + 1)) for i in range(nb_gunners)]
tyre_handlers = [ps.Worker('Handler%i' % (i + 1)) for i in range(nb_tyre_handlers)]
stabilizers = [ps.Worker('Stabilizer%i' % (i + 1)) for i in range(nb_stabilizers)]
```

### Create tasks and assign resources¶

One period is mapped to one second. For example, if lifting the rear take 2sec then the duration will be set to 2.

```
[6]:
```

```
# lift tasks and lifters
# both lift tasks can be processed by any one of the lifters
lift_rear_up = ps.FixedDurationTask('LiftRearUp', duration=2)
lift_front_up = ps.FixedDurationTask('LiftFrontUp', duration=2)
lift_rear_up.add_required_resource(lifters[0])
lift_front_up.add_required_resource(lifters[1])
lift_rear_down = ps.FixedDurationTask('LiftRearDown', duration=2)
lift_front_down = ps.FixedDurationTask('LiftFrontDown', duration=2)
lift_rear_down.add_required_resource(lifters[0])
lift_front_down.add_required_resource(lifters[1])
# unscrew tasks
unscrew_front_left_tyre = ps.FixedDurationTask('UnScrewFrontLeftTyre', duration=2)
unscrew_front_right_tyre = ps.FixedDurationTask('UnScrewFrontRightTyre', duration=2)
unscrew_rear_left_tyre = ps.FixedDurationTask('UnScrewRearLeftTyre', duration=2)
unscrew_rear_right_tyre = ps.FixedDurationTask('UnScrewRearRightTyre', duration=2)
gunner_unscrew_front_left_tyre = ps.SelectWorkers(gunners, 1)
unscrew_front_left_tyre.add_required_resource(gunner_unscrew_front_left_tyre)
gunner_unscrew_front_right_tyre = ps.SelectWorkers(gunners, 1)
unscrew_front_right_tyre.add_required_resource(gunner_unscrew_front_right_tyre)
gunner_unscrew_rear_left_tyre = ps.SelectWorkers(gunners, 1)
unscrew_rear_left_tyre.add_required_resource(gunner_unscrew_rear_left_tyre)
gunner_unscrew_rear_right_tyre = ps.SelectWorkers(gunners, 1)
unscrew_rear_right_tyre.add_required_resource(gunner_unscrew_rear_right_tyre)
# screw tasks and gunners
screw_front_left_tyre = ps.FixedDurationTask('ScrewFrontLeftTyre', duration=2)
screw_front_right_tyre = ps.FixedDurationTask('ScrewFrontRightTyre', duration=2)
screw_rear_left_tyre = ps.FixedDurationTask('ScrewRearLeftTyre', duration=2)
screw_rear_right_tyre = ps.FixedDurationTask('ScrewRearRightTyre', duration=2)
gunner_screw_front_left_tyre = ps.SelectWorkers(gunners)
screw_front_left_tyre.add_required_resource(gunner_screw_front_left_tyre)
gunner_screw_front_right_tyre = ps.SelectWorkers(gunners)
screw_front_right_tyre.add_required_resource(gunner_screw_front_right_tyre)
gunner_screw_rear_left_tyre = ps.SelectWorkers(gunners)
screw_rear_left_tyre.add_required_resource(gunner_screw_rear_left_tyre)
gunner_screw_rear_right_tyre = ps.SelectWorkers(gunners)
screw_rear_right_tyre.add_required_resource(gunner_screw_rear_right_tyre)
```

```
[7]:
```

```
# tires OFF and handlers
front_left_tyre_off = ps.FixedDurationTask('FrontLeftTyreOff', duration=2)
front_right_tyre_off = ps.FixedDurationTask('FrontRightTyreOff', duration=2)
rear_left_tyre_off = ps.FixedDurationTask('RearLeftTyreOff', duration=2)
rear_right_tyre_off = ps.FixedDurationTask('RearRightTyreOff', duration=2)
for tyre_off_task in [front_left_tyre_off, front_right_tyre_off,
rear_left_tyre_off, rear_right_tyre_off]:
tyre_off_task.add_required_resource(ps.SelectWorkers(tyre_handlers))
# tires ON and handlers, same as above
front_left_tyre_on = ps.FixedDurationTask('FrontLeftTyreOn', duration=2)
front_right_tyre_on = ps.FixedDurationTask('FrontRightTyreOn', duration=2)
rear_left_tyre_on = ps.FixedDurationTask('RearLeftTyreOn', duration=2)
rear_right_tyre_on = ps.FixedDurationTask('RearRightTyreOn', duration=2)
for tyre_on_task in [front_left_tyre_on, front_right_tyre_on,
rear_left_tyre_on, rear_right_tyre_on]:
tyre_on_task.add_required_resource(ps.SelectWorkers(tyre_handlers))
```

**Stabilizers** start their job as soon as the car is stopped until the end of the whole activity.

```
[8]:
```

```
stabilize_left = ps.VariableDurationTask('StabilizeLeft')
stabilize_right = ps.VariableDurationTask('StabilizeRight')
stabilize_left.add_required_resource(stabilizers[0])
stabilize_right.add_required_resource(stabilizers[1])
ps.TaskStartAt(stabilize_left, 0)
ps.TaskStartAt(stabilize_right, 0)
ps.TaskEndAt(stabilize_left, change_tires_problem.horizon)
ps.TaskEndAt(stabilize_right, change_tires_problem.horizon)
```

```
[8]:
```

```
TaskEndAt_e5e8c229(<class 'processscheduler.task_constraint.TaskEndAt'>)
1 assertion(s):
StabilizeRight_end == horizon
```

### Task precedences¶

```
[9]:
```

```
# front left tyre operations
fr_left = [unscrew_front_left_tyre, front_left_tyre_off, front_left_tyre_on,
screw_front_left_tyre]
for i in range(len(fr_left) - 1):
ps.TaskPrecedence(fr_left[i], fr_left[i+1])
# front right tyre operations
fr_right = [unscrew_front_right_tyre, front_right_tyre_off, front_right_tyre_on,
screw_front_right_tyre]
for i in range(len(fr_right) - 1):
ps.TaskPrecedence(fr_right[i], fr_right[i+1])
# rear left tyre operations
re_left = [unscrew_rear_left_tyre, rear_left_tyre_off, rear_left_tyre_on,
screw_rear_left_tyre]
for i in range(len(re_left) - 1):
ps.TaskPrecedence(re_left[i], re_left[i+1])
# front left tyre operations
re_right = [unscrew_rear_right_tyre, rear_right_tyre_off, rear_right_tyre_on,
screw_rear_right_tyre]
for i in range(len(re_right) - 1):
ps.TaskPrecedence(re_right[i], re_right[i+1])
# all un screw operations must start after the car is lift by both front and rear jacks
for unscrew_tasks in [unscrew_front_left_tyre, unscrew_front_right_tyre,
unscrew_rear_left_tyre, unscrew_rear_right_tyre]:
ps.TaskPrecedence(lift_rear_up, unscrew_tasks)
ps.TaskPrecedence(lift_front_up, unscrew_tasks)
# lift down operations must occur after each screw task is completed
for screw_task in [screw_front_left_tyre, screw_front_right_tyre,
screw_rear_left_tyre, screw_rear_right_tyre]:
ps.TaskPrecedence(screw_task, lift_rear_down)
ps.TaskPrecedence(screw_task, lift_front_down)
```

### First solution, plot the schedule¶

```
[10]:
```

```
solver = ps.SchedulingSolver(change_tires_problem)
solution_1 = solver.solve()
solution_1.render_gantt_matplotlib(fig_size=(10,5), render_mode='Resource')
```

```
Solver type:
===========
-> Standard SAT/SMT solver
Total computation time:
=====================
ChangeTires satisfiability checked in 0.05s
```

### Second solution: add a makespan objective¶

Obviously, the former solution is not the *best* solution, not sure Ferrari will win this race ! The whole “change tires” activity must be as short as possible, so let’s add a *makespan* objective, i.e. a constraint that minimizes the schedule horizon.

```
[11]:
```

```
# add makespan objective
change_tires_problem.add_objective_makespan()
solver_2 = ps.SchedulingSolver(change_tires_problem)
solution_2 = solver_2.solve()
solution_2.render_gantt_matplotlib(fig_size=(9,5), render_mode='Task')
```

```
Solver type:
===========
-> Standard SAT/SMT solver
Incremental optimizer:
======================
Found value: 24 elapsed time:0.048s
Checking better value <24
Found value: 23 elapsed time:0.163s
Checking better value <23
Found value: 22 elapsed time:0.171s
Checking better value <22
Found value: 21 elapsed time:0.197s
Checking better value <21
Found value: 20 elapsed time:0.202s
Checking better value <20
Found value: 19 elapsed time:0.209s
Checking better value <19
Found value: 18 elapsed time:0.214s
Checking better value <18
Found value: 17 elapsed time:0.234s
Checking better value <17
Found value: 16 elapsed time:0.240s
Checking better value <16
Found value: 15 elapsed time:0.256s
Checking better value <15
Found value: 14 elapsed time:0.260s
Checking better value <14
Found value: 13 elapsed time:0.283s
Checking better value <13
Found value: 12 elapsed time:0.287s
Checking better value <12
No solution can be found for problem ChangeTires.
Reason: Unsatisfiable problem: no solution exists
Found optimum 12. Stopping iteration.
total number of iterations: 14
value: 12
ChangeTires satisfiability checked in 0.29s
```

### Third solution: constraint workers¶

This is not the best possible solution. Indeed, we can notice that the Gunner2 unscrews the RearRightTyre and screw the RearLeft tyre. We cannot imagine that a solution where gunners turn around the car is acceptable. There are two solutions to fix the schedule: - let the gunner be able to turn around the car, and add a “Move” task with a duration that represent the time necessary to move from one tyre to the other, - constraint the worker to screw the same tyre he unscrewed. Let’s go this way

```
[12]:
```

```
ps.SameWorkers(gunner_unscrew_front_left_tyre, gunner_screw_front_left_tyre)
ps.SameWorkers(gunner_unscrew_front_right_tyre, gunner_screw_front_right_tyre)
ps.SameWorkers(gunner_unscrew_rear_left_tyre, gunner_screw_rear_left_tyre)
ps.SameWorkers(gunner_unscrew_rear_right_tyre, gunner_screw_rear_right_tyre)
solver_3 = ps.SchedulingSolver(change_tires_problem)
solution_3 = solver_3.solve()
solution_3.render_gantt_matplotlib(fig_size=(9,5), render_mode='Task')
```

```
Solver type:
===========
-> Standard SAT/SMT solver
Incremental optimizer:
======================
Found value: 28 elapsed time:0.084s
Checking better value <28
Found value: 24 elapsed time:0.091s
Checking better value <24
Found value: 23 elapsed time:0.166s
Checking better value <23
Found value: 22 elapsed time:0.170s
Checking better value <22
Found value: 21 elapsed time:0.174s
Checking better value <21
Found value: 20 elapsed time:0.177s
Checking better value <20
Found value: 19 elapsed time:0.181s
Checking better value <19
Found value: 18 elapsed time:0.185s
Checking better value <18
Found value: 17 elapsed time:0.189s
Checking better value <17
Found value: 16 elapsed time:0.193s
Checking better value <16
Found value: 15 elapsed time:0.197s
Checking better value <15
Found value: 14 elapsed time:0.200s
Checking better value <14
Found value: 13 elapsed time:0.204s
Checking better value <13
Found value: 12 elapsed time:0.208s
Checking better value <12
No solution can be found for problem ChangeTires.
Reason: Unsatisfiable problem: no solution exists
Found optimum 12. Stopping iteration.
total number of iterations: 15
value: 12
ChangeTires satisfiability checked in 0.21s
```

This is much better !

## Use case: software development¶

To illustrate the way to use ProcessScheduler, let’s imagine the simple following use case: the developmenent of a scheduling software intended for end-user. The software is developed using Python, and provides a modern Qt GUI. Three junior developers are in charge (Elias, Louis, Elise), under the supervision of their project manager Justine. The objective of this document is to generate a schedule of the different developmenent tasks to go rom the early design stages to the first software release. This notebook can tested online at mybinder.org

### Step 1. Import the module¶

The best way to import the processscheduler module is to choose an alias import. Indeed, a global import should generate name conflicts. Here, the *ps* alias is used.

```
[1]:
```

```
import processscheduler as ps
from datetime import timedelta, datetime
%config InlineBackend.figure_formats = ['svg']
```

### Step 2. Create the scheduling problem¶

The SchedulingProblem has to be defined. The problem must have a name (it is a mandatory argument). Of course you can create as many problems (i.e; SchedulingProblem instances), for example if you need to compare two or more different schedules.

```
[2]:
```

```
problem = ps.SchedulingProblem('SoftwareDevelopment',
delta_time=timedelta(days=1),
start_time=datetime.now())
```

### Step 3. Create tasks instances¶

The SchedulingProblem has to be defined. The problem must have a name (it is a mandatory argument). Of course you can create as many problems (i.e SchedulingProblem instances) as needed, for example if you need to compare two or more different schedules. In this example, one period is one day.

```
[3]:
```

```
preliminary_design = ps.FixedDurationTask('PreliminaryDesign', duration=1) # 1 day
core_development = ps.VariableDurationTask('CoreDevelopmenent', work_amount=10)
gui_development = ps.VariableDurationTask('GUIDevelopment', work_amount=15)
integration = ps.VariableDurationTask('Integration', work_amount=3)
tests_development = ps.VariableDurationTask('TestDevelopment', work_amount=8)
release = ps.ZeroDurationTask('ReleaseMilestone')
```

### Step 4. Create tasks time constraints¶

Define precedences or set start and end times

```
[4]:
```

```
ps.TaskStartAt(preliminary_design, 0)
ps.TaskPrecedence(preliminary_design, core_development)
ps.TaskPrecedence(preliminary_design, gui_development)
ps.TaskPrecedence(gui_development, tests_development)
ps.TaskPrecedence(core_development, tests_development)
ps.TaskPrecedence(tests_development, integration)
ps.TaskPrecedence(integration, release)
```

```
[4]:
```

```
TaskPrecedence_58faabcb(<class 'processscheduler.task_constraint.TaskPrecedence'>)
1 assertion(s):
Integration_end <= ReleaseMilestone_start
```

### Step 5. Create resources¶

Define all resources required for all tasks to be processed, including productivity and cost_per_period.

```
[5]:
```

```
elias = ps.Worker('Elias', productivity=2, cost=ps.ConstantCostPerPeriod(600)) # cost in $/day
louis = ps.Worker('Louis', productivity=2, cost=ps.ConstantCostPerPeriod(600))
elise = ps.Worker('Elise', productivity=3, cost=ps.ConstantCostPerPeriod(800))
justine = ps.Worker('Justine', productivity=2, cost=ps.ConstantCostPerPeriod(1200))
```

### Step 6. Assign resources to tasks¶

```
[6]:
```

```
preliminary_design.add_required_resources([elias, louis, elise, justine])
core_development.add_required_resources([louis, elise])
gui_development.add_required_resources([elise])
tests_development.add_required_resources([elias, louis])
integration.add_required_resources([justine])
release.add_required_resources([justine])
```

### Step 7. Add a total cost indicator¶

This resource cost indicator computes the total cost of selected resources.

```
[7]:
```

```
cost_ind = problem.add_indicator_resource_cost([elias, louis, elise, justine])
```

### Step 8. Solve and plot using plotly¶

```
[8]:
```

```
# solve
solver = ps.SchedulingSolver(problem)
solution = solver.solve()
```

```
Solver type:
===========
-> Standard SAT/SMT solver
Total computation time:
=====================
SoftwareDevelopment satisfiability checked in 0.01s
```

```
[9]:
```

```
if solution:
solution.render_gantt_plotly()
```