๐Ÿ“ฆ apache / cloudberry

๐Ÿ“„ gpstart ยท 929 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929#!/usr/bin/env python3
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
#
# THIS IMPORT MUST COME FIRST
#
# import mainUtils FIRST to get python version check
import sys
from optparse import OptionGroup, SUPPRESS_HELP

from gppylib.mainUtils import *

try:
    import pickle

    from gppylib.db import dbconn
    from gppylib.gpparseopts import OptParser, OptChecker
    from gppylib.gparray import *
    from gppylib.gplog import get_default_logger, log_to_file_only
    from gppylib import userinput
    from gppylib.db import catalog
    from gppylib.commands import unix
    from gppylib.commands import gp
    from gppylib.commands.gp import SEGMENT_TIMEOUT_DEFAULT
    from gppylib.commands import base
    from gppylib.commands import pg
    from gppylib.commands import dca
    from gppylib import pgconf
    from gppylib.heapchecksum import HeapChecksum
    from gppylib.commands.pg import PgControlData
    from gppylib.operations.startSegments import *
    from gppylib.operations.detect_unreachable_hosts import get_unreachable_segment_hosts, mark_segments_down_for_unreachable_hosts
    from gppylib.utils import TableLogger
    from gppylib.gp_era import GpEraFile
except ImportError as e:
    sys.exit('Cannot import modules.  Please check that you have sourced cloudberry-env.sh.  Detail: ' + str(e))

logger = get_default_logger()


# ---------------------------------------------------------------
class GpStart:
    ######
    def __init__(self, specialMode, restricted, start_standby, coordinator_datadir,
                 wrapper,
                 wrapper_args,
                 skip_standby_check,
                 parallel=gp.DEFAULT_GPSTART_NUM_WORKERS,
                 quiet=False,
                 coordinatoronly=False,
                 interactive=False,
                 timeout=SEGMENT_TIMEOUT_DEFAULT,
                 logfileDirectory=False,
                 skip_heap_checksum_validation=False,
                 fts_hosts=None,
                 etcd_hosts=None,
                 is_external_fts=False
                 ):
        assert (specialMode in [None, 'maintenance'])
        self.specialMode = specialMode
        self.restricted = restricted
        self.start_standby = start_standby
        self.pool = None
        self.parallel = parallel
        self.attempt_standby_start = False
        self.quiet = quiet
        self.coordinatoronly = coordinatoronly
        self.coordinator_datadir = coordinator_datadir
        self.interactive = interactive
        self.timeout = timeout
        self.wrapper = wrapper
        self.wrapper_args = wrapper_args
        self.skip_standby_check = skip_standby_check
        self.logfileDirectory = logfileDirectory
        self.skip_heap_checksum_validation = skip_heap_checksum_validation
        self.fts_hosts = fts_hosts
        self.etcd_hosts = etcd_hosts
        self.is_external_fts = is_external_fts
        self.singlenodemode = False

        #
        # Some variables that are set during execution
        #
        self.era = None
        self.gpversion = None
        self.gparray = None
        self.port = None
        self.gphome = None
        self.dburl = None
        self.max_connections = None
        logger.debug("Setting level of parallelism to: %d" % self.parallel)
        ######
    def _start_all_fts(self):
        if not os.path.exists(self.fts_hosts):
            raise gp.GpError("FTS host file %s does not exist." % self.fts_hosts)
        fts_host_machine_list = read_hosts(self.fts_hosts)

        etcd_config_tmp_file = "/tmp/cbdb_etcd.conf"
        coordinator_data_directory = os.environ.get("COORDINATOR_DATA_DIRECTORY")
        copy_etcd_config_file_cmd = f"gpsync -f {self.fts_hosts} {coordinator_data_directory + '/config' + '/cbdb_etcd.conf'} =:{etcd_config_tmp_file}"
        subprocess.check_output(copy_etcd_config_file_cmd, shell=True)
        logger.info("Begin to start all FTS process.")
        isdemo = (len(fts_host_machine_list) == 1)
        for fts in fts_host_machine_list:
            start_fts(fts, isdemo)
        time.sleep(30)
        for fts in fts_host_machine_list:
            if not check_fts(fts):
                raise gp.GpError("FTS process on %s is not running." % fts)
        logger.info("start all FTS process successfully")
    
    def _generte_etcd_url(self, etcd_port):
        etcd_num = 0
        etcd_host_machine_list = read_hosts(self.etcd_hosts)
        etcd_cmd = ""
        for etcd in etcd_host_machine_list:
            etcd_ip = resolve_hostname(etcd)
            if etcd_cmd == "":
                etcd_cmd = f"etcd-{etcd_num}=http://{etcd_ip}:{etcd_port}"
            else:
                etcd_cmd += f",etcd-{etcd_num}=http://{etcd_ip}:{etcd_port}"
            etcd_num += 1
        return etcd_cmd

    def _start_all_etcd(self):
        if not os.path.exists(self.etcd_hosts):
            raise gp.GpError("ETCD host file %s does not exist." % self.etcd_hosts)
        etcd_host_machine_list = read_hosts(self.etcd_hosts)
        
        logger.info("Begin to start all ETCD process.")
        if len(etcd_host_machine_list) == 1:
            start_single_etcd(etcd_host_machine_list[0])
        else:
            etcd_home_num = 0
            etcd_host_list = self._generte_etcd_url(2380)
            for etcd in etcd_host_machine_list:
                start_etcd(etcd, etcd_home_num, etcd_host_list)
                etcd_home_num += 1
        time.sleep(60)
        for etcd in etcd_host_machine_list:
            if not check_etcd(etcd):
                raise gp.GpError("ETCD process on %s is not running." % etcd)
        logger.info("start all ETCD process successfully")

    ######
    def run(self):
        self._prepare()

        if self.etcd_hosts is not None:
            self._start_all_etcd()

        if self.coordinatoronly:
            if os.getenv('GPSTART_INTERNAL_COORDINATOR_ONLY'):
                logger.info('Coordinator-only start requested for management utilities.')
            else:
                logger.warning("****************************************************************************")
                logger.warning("Coordinator-only start requested. If a standby is configured, this command")
                logger.warning("may lead to a split-brain condition and possible unrecoverable data loss.")
                logger.warning("Maintenance mode should only be used with direction from Cloudberry Support.")
                logger.warning("****************************************************************************")
                if self.interactive:
                    if not userinput.ask_yesno(None, "\nContinue with coordinator-only startup", 'N'):
                        raise UserAbortedException()

        try:
            # Disable Ctrl-C
            signal.signal(signal.SIGINT, signal.SIG_IGN)

            self._startCoordinator()
            logger.info("Coordinator Started...")

            self.singlenodemode = self.gparray.is_singlenode
            if self.singlenodemode:
                logger.warning("SinglenodeMode has been enabled, no segment will be created.")
                standby_was_started = self._start_standby()
                return 0

            if self.coordinatoronly:
                if self.is_external_fts:
                    self._start_all_fts()
                return 0

            num_workers = min(len(self.gparray.get_hostlist()), self.parallel)
            hosts = set(self.gparray.get_hostlist(includeCoordinator=False))
            # We check for unreachable segment hosts first thing, because if a host is down but its segments
            # are marked up, later checks can return invalid or misleading results and the cluster may not
            # start in a good state.
            unreachable_hosts = get_unreachable_segment_hosts(hosts, num_workers)
            if unreachable_hosts:
                mark_segments_down_for_unreachable_hosts(self.gparray.segmentPairs, unreachable_hosts)

            if self.skip_heap_checksum_validation:
                self.coordinator_checksum_value = None
                logger.warning("Because of --skip-heap-checksum-validation, the GUC for data_checksums "
                                   "will not be checked between coordinator and segments")
            else:
                self.coordinator_checksum_value = HeapChecksum(gparray=self.gparray, num_workers=num_workers,
                                                          logger=logger).get_coordinator_value()

            if not self.skip_standby_check:
                self.check_standby()
            else:
                logger.info("Skipping Standby activation status checking.")

            logger.info("Shutting down coordinator")
            self.shutdown_coordinator_only()
            # TODO: check results of command.

        finally:
            # Reenable Ctrl-C
            signal.signal(signal.SIGINT, signal.default_int_handler)

        (segmentsToStart, invalidSegments) = self._prepare_segment_start()

        if self.interactive:
            self._summarize_actions(segmentsToStart)
            if not userinput.ask_yesno(None, "\nContinue with Cloudberry instance startup", 'N'):
                raise UserAbortedException()

        try:
            # Disable Ctrl-C
            signal.signal(signal.SIGINT, signal.SIG_IGN)

            success = self._start(segmentsToStart, invalidSegments)
        finally:
            # Reenable Ctrl-C
            signal.signal(signal.SIGINT, signal.default_int_handler)

        if dca.is_dca_appliance():
            logger.info("Initializing DCA settings")
            dca.DcaGpdbInitialized.local()
            logger.info("DCA settings initialized")
        if self.is_external_fts:
            self._start_all_fts()
        return 0 if success else 1
    ######
    def cleanup(self):
        if self.pool:
            self.pool.haltWork()

            # ------------------------------- Internal Helper --------------------------------
    ######
    def _cbdb_precheck(self):
        logger.info("checking fts hosts...")
        if not os.path.exists(self.fts_hosts):
            raise gp.GpError("FTS host file %s does not exist." % self.fts_hosts)
        
        fts_host_machine_list = read_hosts(self.fts_hosts)
        for fts in fts_host_machine_list:
            if check_fts(fts):
                raise gp.GpError("FTS host %s has alived fts process, please cleanup the environment first." % fts)
            
        if self.etcd_hosts is None:
            logger.info("checking etcd hosts...")
            coordinator_datadir = os.environ.get("COORDINATOR_DATA_DIRECTORY")
            etcd_host_machine_list = read_hosts(coordinator_datadir + "/config/etcd_host")
            for etcd in etcd_host_machine_list:
                if not check_etcd(etcd):
                    raise gp.GpError("ETCD host %s has no alived etcd process, please cleanup the environment first." % etcd)
    
    ######
    def _prepare(self):
        logger.info("Gathering information and validating the environment...")
        if self.is_external_fts:
            self._cbdb_precheck()
        self._basic_setup()

        self._check_version()
        self._check_coordinator_running()

    ######
    def _basic_setup(self):
        self.gphome = gp.get_gphome()
        if self.coordinator_datadir is None:
            self.coordinator_datadir = gp.get_coordinatordatadir()
        self.user = unix.getUserName()
        gp.check_permissions(self.user)
        self._read_postgresqlconf()

    ######
    def _read_postgresqlconf(self):
        logger.debug("Obtaining coordinator's port from coordinator data directory")
        pgconf_dict = pgconf.readfile(self.coordinator_datadir + "/postgresql.conf")
        self.port = pgconf_dict.int('port')
        logger.debug("Read from postgresql.conf port=%s" % self.port)
        self.max_connections = pgconf_dict.int('max_connections')
        logger.debug("Read from postgresql.conf max_connections=%s" % self.max_connections)

    ######
    def _check_version(self):
        self.gpversion = gp.GpVersion.local('local CBDB software version check', self.gphome)
        logger.info("Cloudberry Binary Version: '%s'" % self.gpversion)

        # It would be nice to work out the catalog version => greenplum version
        # calculation so that we can print out nicer error messages when
        # version doesn't match.
        bin_catversion = gp.GpCatVersion.local('local CBDB software catalag version check', self.gphome)
        logger.info("Cloudberry Catalog Version: '%s'" % bin_catversion)

        dir_catversion = gp.GpCatVersionDirectory.local('local CBDB directory catalog version check', self.coordinator_datadir)

        if bin_catversion != dir_catversion:
            logger.info("COORDINATOR_DIRECTORY Catalog Version: '%s'" % dir_catversion)
            logger.info("Catalog Version of coordinator directory incompatible with binaries")
            raise ExceptionNoStackTraceNeeded("Catalog Versions are incompatible")

    ######
    def _check_coordinator_running(self):
        logger.debug("Check if Coordinator is already running...")
        if os.path.exists(self.coordinator_datadir + '/postmaster.pid'):
            logger.warning("postmaster.pid file exists on Coordinator, checking if recovery startup required")
            self._recovery_startup()

        self._remove_postmaster_tmpfile(self.port)

    def shutdown_coordinator_only(self):
        cmd = gp.GpStop("Shutting down coordinator", coordinatorOnly=True,
                        fast=True, quiet=logging_is_quiet(),
                        verbose=logging_is_verbose(),
                        datadir=self.coordinator_datadir,
                        parallel=self.parallel,
                        logfileDirectory=self.logfileDirectory)
        cmd.run()
        logger.debug("results of forcing coordinator shutdown: %s" % cmd)

    def fetch_tli(self, data_dir_path, remoteHost=None):
        if not remoteHost:
            controldata = PgControlData("fetching pg_controldata locally", data_dir_path)
        else:
            controldata = PgControlData("fetching pg_controldata remotely", data_dir_path, REMOTE, remoteHost)

        controldata.run(validateAfter=True)
        return int(controldata.get_value("Latest checkpoint's TimeLineID"))

    class StandbyUnreachable(Exception):
        pass

    def _standby_activated(self):
        logger.debug("Checking if standby has been activated...")

        if not self.gparray.standbyCoordinator:
            return False

        # fetch timelineids for both primary and standby (post-promote)
        primary_tli = self.fetch_tli(self.coordinator_datadir)
        try:
            standby_tli = self.fetch_tli(self.gparray.standbyCoordinator.getSegmentDataDirectory(),
                                         self.gparray.standbyCoordinator.getSegmentHostName())
        except base.ExecutionError as err:
            raise GpStart.StandbyUnreachable(err)

        logger.debug("Primary TLI = %d" % primary_tli)
        logger.debug("Standby TLI = %d" % standby_tli)
        return primary_tli < standby_tli

    def check_standby(self):
        try:
            standby_activated = self._standby_activated()
        except GpStart.StandbyUnreachable as err:
            logger.warning("Standby host is unreachable, cannot determine whether the standby is currently acting as the coordinator. Received error: %s" % err)
            logger.warning("Continue only if you are certain that the standby is not acting as the coordinator.")
            if not self.interactive or not userinput.ask_yesno(None, "\nContinue with startup", 'N'):
                if not self.interactive:
                    logger.warning("Non interactive mode detected. Not starting the cluster. Start the cluster in interactive mode.")
                self.shutdown_coordinator_only()
                raise UserAbortedException()

            # If the user wants to continue when the standby is unreachable,
            # set start_standby to False to prevent starting the unreachable
            # standy later in the startup process.
            self.start_standby = False
            return

        if not standby_activated:
            return

        # stop the coordinator we've started up.
        cmd = gp.GpStop("Shutting down coordinator", coordinatorOnly=True,
                        fast=True, quiet=logging_is_quiet(),
                        verbose=logging_is_verbose(),
                        datadir=self.coordinator_datadir,
                        parallel=self.parallel)
        cmd.run(validateAfter=True)
        logger.info("Coordinator Stopped...")
        raise ExceptionNoStackTraceNeeded("Standby activated, this node no more can act as coordinator.")

    ######
    def _recovery_startup(self):
        logger.info("Commencing recovery startup checks")

        lockfile = "/tmp/.s.PGSQL.%s" % self.port
        tmpfile_exists = os.path.exists(lockfile)

        ss_port_active = unix.PgPortIsActive.local('check ss for coordinator port',
                                                        lockfile, self.port)
        if tmpfile_exists and ss_port_active:
            logger.info("Have lock file %s and a process running on port %s" % (lockfile, self.port))
            raise ExceptionNoStackTraceNeeded("Coordinator instance process running")
        elif tmpfile_exists and not ss_port_active:
            logger.info("Have lock file %s but no process running on port %s" % (lockfile, self.port))
        elif not tmpfile_exists and ss_port_active:
            logger.info("No lock file %s but a process running on port %s" % (lockfile, self.port))
            raise ExceptionNoStackTraceNeeded("Port %s is already in use" % self.port)
        elif not tmpfile_exists and not ss_port_active:
            logger.info("No socket connection or lock file in /tmp found for port=%s" % self.port)

        logger.info("No Coordinator instance process, entering recovery startup mode")

        if tmpfile_exists:
            logger.info("Clearing Coordinator instance lock files")
            os.remove(lockfile)

        postmaster_pid_file = "%s/postmaster.pid" % self.coordinator_datadir
        if os.path.exists(postmaster_pid_file):
            logger.info("Clearing Coordinator instance pid file")
            os.remove("%s/postmaster.pid" % self.coordinator_datadir)

        self._startCoordinator()

        logger.info("Commencing forced instance shutdown")

        gp.GpStop.local("forcing coordinator shutdown", coordinatorOnly=True,
                        verbose=logging_is_verbose,
                        quiet=self.quiet, fast=False,
                        force=True, datadir=self.coordinator_datadir, parallel=self.parallel)

    ######
    def _remove_postmaster_tmpfile(self, port):
        lockfile = "/tmp/.s.PGSQL.%s" % port
        tmpfile_exists = os.path.exists(lockfile)

        if tmpfile_exists:
            logger.info("Clearing Coordinator instance lock files")
            os.remove(lockfile)
        pass

    ######
    def _summarize_actions(self, segmentsToStart):
        logger.info("--------------------------")
        logger.info("Coordinator instance parameters")
        logger.info("--------------------------")
        logger.info("Database                 = %s" % self.dburl.pgdb)
        logger.info("Coordinator Port              = %s" % self.port)
        logger.info("Coordinator directory         = %s" % self.coordinator_datadir)
        logger.info("Timeout                  = %d seconds" % self.timeout)
        if self.gparray.standbyCoordinator:
            if self.start_standby:
                logger.info("Coordinator standby start     = On")
            else:
                logger.info("Coordinator standby start     = Off")
        else:
            logger.info("Coordinator standby           = Off ")

        logger.info("--------------------------------------")
        logger.info("Segment instances that will be started")
        logger.info("--------------------------------------")

        isFileReplication = self.gparray.hasMirrors

        tabLog = TableLogger().setWarnWithArrows(True)
        header = ["Host", "Datadir", "Port"]
        if isFileReplication:
            header.append("Role")
        tabLog.info(header)
        for db in segmentsToStart:
            line = [db.getSegmentHostName(), db.getSegmentDataDirectory(), str(db.getSegmentPort())]
            if isFileReplication:
                line.append("Primary" if db.isSegmentPrimary(True) else "Mirror")
            tabLog.info(line)
        tabLog.outputTable()

    ######
    def _get_format_string(self):
        host_len = 0
        dir_len = 0
        port_len = 0
        for db in self.gparray.getSegDbList():
            if len(db.hostname) > host_len:
                host_len = len(db.hostname)
            if len(db.datadir) > dir_len:
                dir_len = len(db.datadir)
            if len(str(db.port)) > port_len:
                port_len = len(str(db.port))

        return "%-" + str(host_len) + "s  %-" + str(dir_len) + "s  %-" + str(port_len) + "s  %s"

    ######
    def _startCoordinator(self):
        if self.restricted:
            logger.info("Starting Coordinator instance in admin and RESTRICTED mode")
        else:
            logger.info("Starting Coordinator instance in admin mode")

        cmd = gp.CoordinatorStart('Coordinator in utility mode with restricted set to {0}'.format(self.restricted),
                            self.coordinator_datadir, self.port, self.era, wrapper=self.wrapper,
                            wrapper_args=self.wrapper_args, specialMode=self.specialMode,
                            restrictedMode=self.restricted, timeout=self.timeout, utilityMode=True,
                            max_connections=self.max_connections)
        cmd.run()

        if cmd.get_results().rc != 0:
            if self.restricted:
                logger.fatal("Failed to start Coordinator instance in admin and RESTRICTED mode")
            else:
                logger.fatal("Failed to start Coordinator instance in admin mode")
            cmd.validate()

        logger.info("Obtaining Cloudberry Coordinator catalog information")

        logger.info("Obtaining Segment details from coordinator...")
        self.dburl = dbconn.DbURL(port=self.port, dbname='template1')
        self.gparray = GpArray.initFromCatalog(self.dburl, utility=True)

        logger.info("Setting new coordinator era")
        e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose())
        e.new_era(self.gparray.coordinator.hostname, self.port, time.strftime('%y%m%d%H%M%S'))
        self.era = e.era

    ######
    def _start(self, segmentsToStart, invalidSegments):
        """ starts all of the segments, the coordinator and the standby coordinator

            returns whether all segments that should be started were started successfully

            note that the parameters do not list coordinator/standby, they only list data segments
        """
        workers = min(len(self.gparray.get_hostlist()), self.parallel)
        self.pool = base.WorkerPool(numWorkers=workers)

        if os.path.exists(self.coordinator_datadir + "/gpexpand.status") and not self.restricted:
            raise ExceptionNoStackTraceNeeded(
                "Found a System Expansion Setup in progress. Please run 'gpexpand --rollback'")

        logger.debug("gparray does%s have mirrors" % ("" if self.gparray.hasMirrors else " not"))

        if self.gparray.hasMirrors:
            startMode = START_AS_PRIMARY_OR_MIRROR
        else:
            startMode = START_AS_MIRRORLESS

        # this will eventually start gpsegstart.py
        segmentStartOp = StartSegmentsOperation(self.pool, self.quiet, self.gpversion,
                                                self.gphome, self.coordinator_datadir, self.coordinator_checksum_value,
                                                self.timeout, self.specialMode, self.wrapper, self.wrapper_args, self.parallel,
                                                logfileDirectory=self.logfileDirectory)
        segmentStartResult = segmentStartOp.startSegments(self.gparray, segmentsToStart, startMode, self.era)

        # see if we have at least one segment per content
        willShutdownSegments = not self._verify_enough_segments(segmentStartResult, self.gparray)

        # process the result of segment startup
        self._print_segment_start(segmentStartResult, invalidSegments, willShutdownSegments)

        if willShutdownSegments:
            # go through and remove any segments that we did start so that we keep everything
            # shutdown cleanly
            self._shutdown_segments(segmentStartResult)
            raise ExceptionNoStackTraceNeeded("Do not have enough valid segments to start the array.")

        failedToStart = segmentStartResult.getFailedSegmentObjs()
        coordinator_result, message = self._start_final_coordinator()
        if not coordinator_result:
            return False

        # start standby after coordinator in dispatch mode comes up
        standby_was_started = self._start_standby()

        # report if we complete operations
        return self._check_final_result(
            not standby_was_started and self.attempt_standby_start,
            failedToStart, invalidSegments, message)

    ######
    def _prepare_segment_start(self):
        segs = self.gparray.get_valid_segdbs()

        logger.debug("gp_segment_configuration indicates following valid segments")
        for seg in segs:
            logger.debug("SegDB: %s" % seg)

        # segments marked down
        invalid_segs = self.gparray.get_invalid_segdbs()

        for seg in invalid_segs:
            logger.warning("Skipping startup of segment marked down in configuration: on %s directory %s <<<<<" % \
                           (seg.getSegmentHostName(), seg.getSegmentDataDirectory()))

        return (segs, invalid_segs)

    ####
    def _verify_enough_segments(self, startResult, gparray):
        successfulSegments = startResult.getSuccessfulSegments()

        allSegmentsByContent = GpArray.getSegmentsByContentId(gparray.getSegDbList())
        successfulSegmentsByDbId = GpArray.getSegmentsGroupedByValue(successfulSegments, Segment.getSegmentDbId)

        #
        # look at each content, see if there is a segment available (or one
        # which can be made available by failing over)
        #
        for primary in gparray.getSegDbList():

            if not primary.isSegmentPrimary(current_role=True):
                continue

            # find the mirror
            segs = allSegmentsByContent[primary.getSegmentContentId()]
            mirror = None
            if len(segs) > 1:
                mirror = [s for s in segs if s.isSegmentMirror(current_role=True)][0]

            if primary.getSegmentDbId() in successfulSegmentsByDbId:
                # good, we can continue!
                continue

            if mirror is not None \
                    and mirror.getSegmentDbId() in successfulSegmentsByDbId \
                    and primary.isSegmentModeSynchronized():
                #
                # we could fail over to that mirror, so it's okay to start up like this
                #
                continue

            logger.error("No segment started for content: %d." % primary.getSegmentContentId())
            logger.info("dumping success segments: %s" % [s.__str__() for s in startResult.getSuccessfulSegments()])
            return False
        return True

    ######
    def _shutdown_segments(self, segmentStartResult):

        logger.info("Commencing parallel segment instance shutdown, please wait...")

        #
        # Note that a future optimization would be to only stop the segments that we actually started.
        #    This requires knowing which ones are left in a partially up state
        #
        #
        # gather the list of those that we actually tried to start
        toStop = []
        toStop.extend(segmentStartResult.getSuccessfulSegments())
        toStop.extend([f.getSegment() for f in segmentStartResult.getFailedSegmentObjs()])

        segmentsByHost = GpArray.getSegmentsByHostName(toStop)

        #
        # stop them, stopping primaries before mirrors
        #
        for type in ["primary", "mirror"]:
            for hostName, segments in segmentsByHost.items():

                if type == "primary":
                    segments = [seg for seg in segments if seg.isSegmentPrimary(current_role=True)]
                else:
                    segments = [seg for seg in segments if seg.isSegmentMirror(current_role=True)]

                if len(segments) > 0:
                    logger.debug("Dispatching command to shutdown %s segments on host: %s" % (type, hostName))
                    cmd = gp.GpSegStopCmd("remote segment stop on host '%s'" % hostName,
                                          self.gphome, self.gpversion,
                                          mode='immediate', dbs=segments,
                                          verbose=logging_is_verbose(),
                                          ctxt=base.REMOTE, remoteHost=hostName)
                    self.pool.addCommand(cmd)

            if self.quiet:
                self.pool.join()
            else:
                base.join_and_indicate_progress(self.pool)

    ######
    def _print_segment_start(self, segmentStartResult, invalidSegments, willShutdownSegments):
        """
        Print the results of segment startup

        segmentStartResult is the StartSegmentsResult from the actual start
        invalidSegments are those that we didn't even try to start because they are marked as down or should otherwise
           not be started
        """
        started = len(segmentStartResult.getSuccessfulSegments())
        failed = len(segmentStartResult.getFailedSegmentObjs())
        totalTriedToStart = started + failed

        if failed or logging_is_verbose():
            logger.info("----------------------------------------------------")
            for failure in segmentStartResult.getFailedSegmentObjs():
                segment = failure.getSegment()
                logger.info("DBID:%d  FAILED  host:'%s' datadir:'%s' with reason:'%s'" % (
                    segment.getSegmentDbId(), segment.getSegmentHostName(),
                    segment.getSegmentDataDirectory(), failure.getReason()))
            for segment in segmentStartResult.getSuccessfulSegments():
                logger.debug("DBID:%d  STARTED" % segment.getSegmentDbId())
            logger.info("----------------------------------------------------\n\n")

        tableLog = TableLogger().setWarnWithArrows(True)

        tableLog.addSeparator()
        tableLog.info(["Successful segment starts", "= %d" % started])

        tableLog.infoOrWarn(failed, ["Failed segment starts", "= %d" % failed])

        tableLog.infoOrWarn(len(invalidSegments) > 0,
                            ["Skipped segment starts (segments are marked down in configuration)",
                             "= %d" % len(invalidSegments)])
        tableLog.addSeparator()
        tableLog.outputTable()

        attentionFlag = "<<<<<<<<" if started != totalTriedToStart else ""
        if len(invalidSegments) > 0:
            skippedMsg = ", skipped %s other segments" % len(invalidSegments)
        else:
            skippedMsg = ""
        logger.info("Successfully started %d of %d segment instances%s %s" %
                    (started, totalTriedToStart, skippedMsg, attentionFlag))
        logger.info("----------------------------------------------------")

        if failed:
            logger.warning("Segment instance startup failures reported")
            logger.warning("Failed start %d of %d segment instances %s" % \
                           (failed, totalTriedToStart, attentionFlag))
            logger.warning("Review %s" % get_logfile())
            if not willShutdownSegments:
                logger.warning("For more details on segment startup failure(s)")
                logger.warning("Run  gpstate -s  to review current segment instance status")

            logger.info("----------------------------------------------------")

        if len(invalidSegments) > 0:
            logger.warning("****************************************************************************")
            logger.warning("There are %d segment(s) marked down in the database" % len(invalidSegments))
            logger.warning("To recover from this current state, review usage of the gprecoverseg")
            logger.warning("management utility which will recover failed segment instance databases.")
            logger.warning("****************************************************************************")

    ######
    def _check_final_result(self, standbyFailure,
                            failedToStart, invalidSegments, msg):
        if standbyFailure:
            logger.warning("Standby Coordinator could not be started")
        if len(failedToStart) > 0:
            logger.warning("Number of segments which failed to start:  %d", len(failedToStart))

        if standbyFailure or len(failedToStart) > 0:
            return False

        if len(invalidSegments) > 0:
            logger.warning("Number of segments not attempted to start: %d", len(invalidSegments))

        if (len(failedToStart) > 0 or len(invalidSegments) > 0 or
                    msg is not None):
            logger.info("Check status of database with gpstate utility")
        else:
            logger.info("Database successfully started")

        return True

    ######
    def _start_final_coordinator(self):
        """ Last item in the startup sequence is to start the coordinator.

            After starting the coordinator we connect to it.  This is done both as a check that the system is
            actually started but its also done because certain backend processes don't get kickstarted
            until the first connection.  The DTM is an example of this and it allows those initialization
            messages to end up in the gpstart log as opposed to the user's psql session.
            Returns a tuple of (result[bool], message[string])
        """
        restrict_txt = ""
        if self.restricted:
            restrict_txt = "in RESTRICTED mode"

        logger.info("Starting Coordinator instance %s directory %s %s" % (
        self.gparray.coordinator.hostname, self.coordinator_datadir, restrict_txt))

        # attempt to start coordinator
        gp.CoordinatorStart.local("Starting Coordinator instance",
                             self.coordinator_datadir, self.port, self.era,
                             wrapper=self.wrapper, wrapper_args=self.wrapper_args,
                             specialMode=self.specialMode, restrictedMode=self.restricted, timeout=self.timeout,
                             max_connections=self.max_connections
                             )

        # check that coordinator is running now
        if not pg.DbStatus.local('coordinator instance', self.gparray.coordinator):
            logger.warning("Command pg_ctl reports Coordinator %s on port %d not running" % (
            self.gparray.coordinator.datadir, self.gparray.coordinator.port))
            logger.warning("Coordinator could not be started")
            return False, None

        logger.info("Command pg_ctl reports Coordinator %s instance active" % self.gparray.coordinator.hostname)

        msg = None
        try:
            self.dburl.retries = 4
            self.dburl.timeout = 15
            coordinatorconn = dbconn.connect(self.dburl)
            coordinatorconn.close()

        except Exception as e:
            # MPP-14016.  While certain fts scenarios will trigger initial connection failures
            # we still need watch for PANIC events.
            msg = str(e)
            if 'PANIC' in msg:
                logger.critical(msg)
                return False
            logger.warning(msg)

        # set the era we used when starting the segments
        e = GpEraFile(self.coordinator_datadir, logger=get_logger_if_verbose())
        e.set_era(self.era)

        return True, msg

    ######
    def _start_standby(self):
        """ used to start the standbycoordinator if necessary.

            returns if the standby coordinator was started or not
        """
        if self.start_standby and self.gparray.standbyCoordinator is not None:
            try:
                self.attempt_standby_start = True
                host = self.gparray.standbyCoordinator.hostname
                datadir = self.gparray.standbyCoordinator.datadir
                port = self.gparray.standbyCoordinator.port
                return gp.start_standbycoordinator(host, datadir, port, era=self.era,
                                              wrapper=self.wrapper,
                                              wrapper_args=self.wrapper_args)
            except base.ExecutionError as e:
                logger.warning("Error occured while starting the standby coordinator: %s" % e)
                return False
        else:
            logger.info("No standby coordinator configured.  skipping...")
            return False

    # ----------------------- Command line option parser ----------------------
    @staticmethod
    def createParser():
        parser = OptParser(option_class=OptChecker,
                           description="Starts a CBDB Array.",
                           version='%prog version $Revision$')
        parser.setHelp([])

        addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption=True)

        addTo = OptionGroup(parser, 'Connection options')
        parser.add_option_group(addTo)
        addCoordinatorDirectoryOptionForSingleClusterProgram(addTo)

        addTo = OptionGroup(parser, 'Database startup options: ')
        parser.add_option_group(addTo)
        addTo.add_option('-U', '--specialMode', type='choice', choices=['maintenance'],
                         metavar='maintenance', action='store', default=None,
                         help=SUPPRESS_HELP)
        addTo.add_option('-m', '--master_only', '-c', '--coordinator_only', dest="coordinator_only", action='store_true',
                         help='start coordinator instance only in maintenance mode')
        addTo.add_option('-y', '--no_standby', dest="start_standby", action='store_false', default=True,
                         help='do not start coordinator standby server')
        addTo.add_option('-B', '--parallel', type="int", default=gp.DEFAULT_GPSTART_NUM_WORKERS, metavar="<parallel_processes>",
                         help='number of segment hosts to run in parallel. Default is %d' % gp.DEFAULT_GPSTART_NUM_WORKERS)
        addTo.add_option('-R', '--restricted', action='store_true',
                         help='start in restricted mode. Only users with superuser privilege are allowed to connect.')
        addTo.add_option('-t', '--timeout', dest='timeout', default=SEGMENT_TIMEOUT_DEFAULT, type='int',
                         help='time to wait for segment startup (in seconds)')
        addTo.add_option('', '--wrapper', dest="wrapper", default=None, type='string')
        addTo.add_option('', '--wrapper-args', dest="wrapper_args", default=None, type='string')
        addTo.add_option('-S', '--skip_standby_check', dest="skip_standby_check", action='store_true', default=False)
        addTo.add_option('--skip-heap-checksum-validation', dest='skip_heap_checksum_validation',
                         action='store_true', default=False, help='Skip the validation of data_checksums GUC. '
                                                                  'Note: Starting up the cluster without this '
                                                                  'validation could lead to data loss.')
        addTo.add_option('-F', dest='fts_hosts', type='string',default=None ,
                         help='specify the file that contains all fts hosts.If this argument is set, `gpstart` will attempt'
                                'to start all fts in the specified hosts, if not, `gpstart` will start all fts spectified in the'
                                '$COORDINATOR_DATA_DIRECTORY/config/fts_host')
        addTo.add_option('-E', dest='etcd_hosts', type='string',default=None ,
                         help='specify the file that contains all etcd hosts.If this argument is set, `gpstart` will attempt'
                                'to start all etcd in the specified hosts')

        parser.set_defaults(verbose=False, filters=[], slice=(None, None))

        return parser

    @staticmethod
    def createProgram(options, args):
        logfileDirectory = options.ensure_value("logfileDirectory", False)
        proccount = os.environ.get('GP_MGMT_PROCESS_COUNT')
        external_fts = is_external_fts()
        if options.parallel == 64 and proccount is not None:
            options.parallel = int(proccount)
        
        if external_fts and (options.fts_hosts or options.etcd_hosts):
            ProgramArgumentValidationException("internal fts not suopport -F and -E")

        # -n sanity check
        if options.parallel > 128 or options.parallel < 1:
            raise ProgramArgumentValidationException("Invalid value for parallel degree: %s" % options.parallel)

        if args:
            raise ProgramArgumentValidationException(
                "Argument %s is invalid.  Is an option missing a parameter?" % args[-1])
        
        if is_external_fts:
            if options.fts_hosts is None:
                coordinator_data_directory = gp.get_coordinatordatadir()
                options.fts_hosts = coordinator_data_directory + '/config' + '/fts_host'

        return GpStart(options.specialMode, options.restricted,
                       options.start_standby,
                       coordinator_datadir=options.coordinatorDataDirectory,
                       parallel=options.parallel,
                       quiet=options.quiet,
                       coordinatoronly=options.coordinator_only,
                       interactive=options.interactive,
                       timeout=options.timeout,
                       wrapper=options.wrapper,
                       wrapper_args=options.wrapper_args,
                       skip_standby_check=options.skip_standby_check,
                       logfileDirectory=logfileDirectory,
                       skip_heap_checksum_validation=options.skip_heap_checksum_validation,
                       fts_hosts=options.fts_hosts,
                       etcd_hosts=options.etcd_hosts,
                       is_external_fts=external_fts
                       )


if __name__ == '__main__':
    simple_main(GpStart.createParser, GpStart.createProgram)