Skip to content

Discoveries are now closed and unregistered after failure #1667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions arduino/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,21 +374,12 @@ func (disc *PluggableDiscovery) Stop() error {
}

// Quit terminates the discovery. No more commands can be accepted by the discovery.
func (disc *PluggableDiscovery) Quit() error {
if err := disc.sendCommand("QUIT\n"); err != nil {
return err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return fmt.Errorf(tr("calling %[1]s: %[2]w"), "QUIT", err)
} else if msg.EventType != "quit" {
return errors.Errorf(tr("communication out of sync, expected '%[1]s', received '%[2]s'"), "quit", msg.EventType)
} else if msg.Error {
return errors.Errorf(tr("command failed: %s"), msg.Message)
} else if strings.ToUpper(msg.Message) != "OK" {
return errors.Errorf(tr("communication out of sync, expected '%[1]s', received '%[2]s'"), "OK", msg.Message)
func (disc *PluggableDiscovery) Quit() {
_ = disc.sendCommand("QUIT\n")
if _, err := disc.waitMessage(time.Second * 5); err != nil {
logrus.Errorf("Quitting discovery %s: %s", disc.id, err)
}
disc.killProcess()
return nil
}

// List executes an enumeration of the ports and returns a list of the available
Expand Down
4 changes: 1 addition & 3 deletions arduino/discovery/discovery_client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ out:
}

for _, disc := range discoveries {
if err := disc.Quit(); err != nil {
log.Fatal("Error stopping discovery:", err)
}
disc.Quit()
fmt.Println("Discovery QUITed")
for disc.State() == discovery.Alive {
time.Sleep(time.Millisecond)
Expand Down
47 changes: 43 additions & 4 deletions arduino/discovery/discoverymanager/discoverymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"github.com/arduino/arduino-cli/arduino/discovery"
"github.com/arduino/arduino-cli/i18n"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// DiscoveryManager is required to handle multiple pluggable-discovery that
// may be shared across platforms
type DiscoveryManager struct {
discoveries map[string]*discovery.PluggableDiscovery
discoveriesMutex sync.Mutex
discoveries map[string]*discovery.PluggableDiscovery
}

var tr = i18n.Tr
Expand All @@ -42,12 +44,16 @@ func New() *DiscoveryManager {
// Clear resets the DiscoveryManager to its initial state
func (dm *DiscoveryManager) Clear() {
dm.QuitAll()
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
dm.discoveries = map[string]*discovery.PluggableDiscovery{}
}

// IDs returns the list of discoveries' ids in this DiscoveryManager
func (dm *DiscoveryManager) IDs() []string {
ids := []string{}
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
for id := range dm.discoveries {
ids = append(ids, id)
}
Expand All @@ -57,19 +63,38 @@ func (dm *DiscoveryManager) IDs() []string {
// Add adds a discovery to the list of managed discoveries
func (dm *DiscoveryManager) Add(disc *discovery.PluggableDiscovery) error {
id := disc.GetID()
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
if _, has := dm.discoveries[id]; has {
return errors.Errorf(tr("pluggable discovery already added: %s"), id)
}
dm.discoveries[id] = disc
return nil
}

// remove quits and deletes the discovery with specified id
// from the discoveries managed by this DiscoveryManager
func (dm *DiscoveryManager) remove(id string) {
dm.discoveriesMutex.Lock()
d := dm.discoveries[id]
delete(dm.discoveries, id)
dm.discoveriesMutex.Unlock()
d.Quit()
logrus.Infof("Closed and removed discovery %s", id)
}

// parallelize runs function f concurrently for each discovery.
// Returns a list of errors returned by each call of f.
func (dm *DiscoveryManager) parallelize(f func(d *discovery.PluggableDiscovery) error) []error {
var wg sync.WaitGroup
errChan := make(chan error)
dm.discoveriesMutex.Lock()
discoveries := []*discovery.PluggableDiscovery{}
for _, d := range dm.discoveries {
discoveries = append(discoveries, d)
}
dm.discoveriesMutex.Unlock()
for _, d := range discoveries {
wg.Add(1)
go func(d *discovery.PluggableDiscovery) {
defer wg.Done()
Expand Down Expand Up @@ -103,6 +128,7 @@ func (dm *DiscoveryManager) RunAll() []error {
}

if err := d.Run(); err != nil {
dm.remove(d.GetID())
return fmt.Errorf(tr("discovery %[1]s process not started: %[2]w"), d.GetID(), err)
}
return nil
Expand All @@ -119,6 +145,7 @@ func (dm *DiscoveryManager) StartAll() []error {
return nil
}
if err := d.Start(); err != nil {
dm.remove(d.GetID())
return fmt.Errorf(tr("starting discovery %[1]s: %[2]w"), d.GetID(), err)
}
return nil
Expand All @@ -139,6 +166,7 @@ func (dm *DiscoveryManager) StartSyncAll() (<-chan *discovery.Event, []error) {

eventCh, err := d.StartSync(5)
if err != nil {
dm.remove(d.GetID())
return fmt.Errorf(tr("start syncing discovery %[1]s: %[2]w"), d.GetID(), err)
}

Expand Down Expand Up @@ -170,6 +198,7 @@ func (dm *DiscoveryManager) StopAll() []error {
}

if err := d.Stop(); err != nil {
dm.remove(d.GetID())
return fmt.Errorf(tr("stopping discovery %[1]s: %[2]w"), d.GetID(), err)
}
return nil
Expand All @@ -185,9 +214,7 @@ func (dm *DiscoveryManager) QuitAll() []error {
return nil
}

if err := d.Quit(); err != nil {
return fmt.Errorf(tr("quitting discovery %[1]s: %[2]w"), d.GetID(), err)
}
d.Quit()
return nil
})
return errs
Expand All @@ -204,7 +231,13 @@ func (dm *DiscoveryManager) List() ([]*discovery.Port, []error) {
Port *discovery.Port
}
msgChan := make(chan listMsg)
dm.discoveriesMutex.Lock()
discoveries := []*discovery.PluggableDiscovery{}
for _, d := range dm.discoveries {
discoveries = append(discoveries, d)
}
dm.discoveriesMutex.Unlock()
for _, d := range discoveries {
wg.Add(1)
go func(d *discovery.PluggableDiscovery) {
defer wg.Done()
Expand Down Expand Up @@ -243,7 +276,13 @@ func (dm *DiscoveryManager) List() ([]*discovery.Port, []error) {
// ListCachedPorts return the current list of ports detected from all discoveries
func (dm *DiscoveryManager) ListCachedPorts() []*discovery.Port {
res := []*discovery.Port{}
dm.discoveriesMutex.Lock()
discoveries := []*discovery.PluggableDiscovery{}
for _, d := range dm.discoveries {
discoveries = append(discoveries, d)
}
dm.discoveriesMutex.Unlock()
for _, d := range discoveries {
if d.State() != discovery.Syncing {
// Discovery is not syncing
continue
Expand Down
11 changes: 5 additions & 6 deletions commands/board/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,15 +296,14 @@ func Watch(instanceID int32, interrupt <-chan bool) (<-chan *rpc.BoardListWatchR
Error: boardsError,
}
case <-interrupt:
errs := dm.StopAll()
if len(errs) > 0 {
for _, err := range dm.StopAll() {
// Discoveries that return errors have their process
// closed and are removed from the list of discoveries
// in the manager
outChan <- &rpc.BoardListWatchResponse{
EventType: "error",
Error: tr("stopping discoveries: %s", errs),
Error: tr("stopping discoveries: %s", err),
}
// Don't close the channel if quitting all discoveries
// failed, otherwise some processes might be left running.
continue
}
return
}
Expand Down